#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "mio.h"
#include "util/inaddr.h"
#ifdef MIO_POLL
#include "mio_poll.h"
#endif
#ifdef MIO_SELECT
#include "mio_select.h"
#endif
typedef enum {
type_CLOSED = 0x00,
type_NORMAL = 0x01,
type_LISTEN = 0x02,
type_CONNECT = 0x10,
type_CONNECT_READ = 0x11,
type_CONNECT_WRITE = 0x12
} mio_type_t;
struct mio_fd_st
{
mio_type_t type;
mio_handler_t app;
void *arg;
};
struct mio_st
{
struct mio_fd_st *fds;
int maxfd;
int highfd;
MIO_VARS
};
#define FD(m,f) m->fds[f]
#define ACT(m,f,a,d) (*(FD(m,f).app))(m,a,f,d,FD(m,f).arg)
#define ZONE __LINE__
#ifndef MIO_DEBUG
#define MIO_DEBUG 0
#endif
#define mio_debug if(MIO_DEBUG) _mio_debug
void _mio_debug(int line, const char *msgfmt, ...)
{
va_list ap;
va_start(ap,msgfmt);
fprintf(stderr,"mio.c#%d: ",line);
vfprintf(stderr,msgfmt,ap);
fprintf(stderr,"\n");
}
MIO_FUNCS
void mio_close(mio_t m, int fd)
{
mio_debug(ZONE,"actually closing fd #%d",fd);
MIO_REMOVE_FD(m, fd);
ACT(m, fd, action_CLOSE, NULL);
close(fd);
memset(&FD(m,fd), 0, sizeof(struct mio_fd_st));
}
void _mio_accept(mio_t m, int fd)
{
struct sockaddr_storage serv_addr;
socklen_t addrlen = (socklen_t) sizeof(serv_addr);
int newfd, dupfd;
char ip[INET6_ADDRSTRLEN];
mio_debug(ZONE, "accepting on fd #%d", fd);
newfd = accept(fd, (struct sockaddr*)&serv_addr, &addrlen);
if(newfd <= 0) return;
if(addrlen <= 0) {
close(newfd);
return;
}
j_inet_ntop(&serv_addr, ip, sizeof(ip));
mio_debug(ZONE, "new socket accepted fd #%d, %s:%d", newfd, ip, j_inet_getport(&serv_addr));
if(mio_fd(m, newfd, FD(m,fd).app, FD(m,fd).arg) < 0)
{
dupfd = dup(newfd);
close(newfd);
if(dupfd < 0 || mio_fd(m, dupfd, FD(m,fd).app, FD(m,fd).arg) < 0) {
mio_debug(ZONE,"failed to add fd");
if(dupfd >= 0) close(dupfd);
return;
}
newfd = dupfd;
}
if (ACT(m, newfd, action_ACCEPT, ip))
{
mio_debug(ZONE, "accept was rejected for %s:%d", ip, newfd);
MIO_REMOVE_FD(m, newfd);
close(newfd);
memset(&FD(m, newfd), 0, sizeof(struct mio_fd_st));
}
return;
}
void _mio_connect(mio_t m, int fd)
{
mio_type_t type = FD(m,fd).type;
mio_debug(ZONE, "connect processing for fd #%d", fd);
FD(m,fd).type = type_NORMAL;
MIO_UNSET_WRITE(m,fd);
if(type & type_CONNECT_READ) mio_read(m,fd);
if(type & type_CONNECT_WRITE) mio_write(m,fd);
}
int mio_fd(mio_t m, int fd, mio_handler_t app, void *arg)
{
int flags;
mio_debug(ZONE, "adding fd #%d", fd);
if(fd >= m->maxfd)
{
mio_debug(ZONE,"fd to high");
return -1;
}
FD(m,fd).type = type_NORMAL;
FD(m,fd).app = app;
FD(m,fd).arg = arg;
MIO_INIT_FD(m, fd);
#if defined(HAVE_FCNTL)
flags = fcntl(fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
#elif defined(HAVE_IOCTL)
flags = 1;
ioctl(fd, FIONBIO, &flags);
#endif
if(fd > m->highfd) m->highfd = fd;
return fd;
}
void mio_app(mio_t m, int fd, mio_handler_t app, void *arg)
{
FD(m,fd).app = app;
FD(m,fd).arg = arg;
}
void mio_run(mio_t m, int timeout)
{
int retval, fd;
mio_debug(ZONE, "mio running for %d", timeout);
retval = MIO_CHECK(m, timeout);
if(retval == 0) return;
if(retval < 0)
{
mio_debug(ZONE, "MIO_CHECK returned an error (%d)", MIO_ERROR(m));
return;
}
mio_debug(ZONE,"mio working: %d",retval);
for(fd = 0; fd <= m->highfd; fd++)
{
if(FD(m,fd).type == type_CLOSED) continue;
if(FD(m,fd).type == type_LISTEN && MIO_CAN_READ(m, fd))
{
_mio_accept(m, fd);
continue;
}
if(FD(m,fd).type & type_CONNECT && (MIO_CAN_READ(m, fd) || MIO_CAN_WRITE(m, fd)))
{
_mio_connect(m, fd);
continue;
}
if(FD(m,fd).type == type_NORMAL && MIO_CAN_READ(m, fd))
{
if(ACT(m, fd, action_READ, NULL) == 0)
MIO_UNSET_READ(m, fd);
}
if(FD(m,fd).type == type_NORMAL && MIO_CAN_WRITE(m, fd))
{
if(ACT(m, fd, action_WRITE, NULL) == 0)
MIO_UNSET_WRITE(m, fd);
}
}
}
mio_t mio_new(int maxfd)
{
mio_t m;
struct rlimit rl = {-1, -1};
getrlimit(RLIMIT_NOFILE, &rl);
rl.rlim_cur = maxfd < OPEN_MAX ? maxfd : OPEN_MAX;
setrlimit (RLIMIT_NOFILE, &rl);
getrlimit(RLIMIT_NOFILE, &rl);
mio_debug(ZONE,"new open file limit: %lu", (long unsigned) rl.rlim_cur);
mio_debug(ZONE,"new open file max: %lu", (long unsigned) rl.rlim_max);
maxfd = (int)rl.rlim_cur;
mio_debug(ZONE,"maxfd is now: %i", maxfd);
if((m = malloc(sizeof(struct mio_st))) == NULL) return NULL;
if((m->fds = malloc(sizeof(struct mio_fd_st) * maxfd)) == NULL)
{
mio_debug(ZONE,"internal error creating new mio");
free(m);
return NULL;
}
memset(m->fds, 0, sizeof(struct mio_fd_st) * maxfd);
m->maxfd = maxfd;
m->highfd = 0;
MIO_INIT_VARS(m);
return m;
}
void mio_free(mio_t m)
{
MIO_FREE_VARS(m);
free(m->fds);
free(m);
}
void mio_read(mio_t m, int fd)
{
if(m == NULL || fd < 0) return;
if(FD(m,fd).type & type_CONNECT)
{
FD(m,fd).type |= type_CONNECT_READ;
return;
}
MIO_SET_READ(m, fd);
}
void mio_write(mio_t m, int fd)
{
if(m == NULL || fd < 0) return;
if(FD(m,fd).type & type_CONNECT)
{
FD(m,fd).type |= type_CONNECT_WRITE;
return;
}
if(FD(m,fd).type != type_NORMAL)
return;
if(ACT(m, fd, action_WRITE, NULL) == 0) return;
MIO_SET_WRITE(m, fd);
}
int mio_listen(mio_t m, int port, char *sourceip, mio_handler_t app, void *arg)
{
int fd, flag = 1;
struct sockaddr_storage sa;
if(m == NULL) return -1;
mio_debug(ZONE, "mio to listen on %d [%s]", port, sourceip);
memset(&sa, 0, sizeof(sa));
if(sourceip != NULL && !j_inet_pton(sourceip, &sa))
return -1;
if(sa.ss_family == 0)
sa.ss_family = AF_INET;
if((fd = socket(sa.ss_family,SOCK_STREAM,0)) < 0) return -1;
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(flag)) < 0) return -1;
j_inet_setport(&sa, port);
if(bind(fd,(struct sockaddr*)&sa,j_inet_addrlen(&sa)) < 0)
{
close(fd);
return -1;
}
if(listen(fd, 10) < 0)
{
close(fd);
return -1;
}
if(mio_fd(m, fd, app, arg) < 0)
{
close(fd);
return -1;
}
FD(m,fd).type = type_LISTEN;
mio_read(m,fd);
return fd;
}
int mio_connect(mio_t m, int port, char *hostip, mio_handler_t app, void *arg)
{
int fd, flag, flags;
struct sockaddr_storage sa;
memset(&sa, 0, sizeof(sa));
if(m == NULL || port <= 0 || hostip == NULL) return -1;
mio_debug(ZONE, "mio connecting to %s, port=%d",hostip,port);
if(j_inet_pton(hostip, &sa)<=0)
return -1;
if((fd = socket(sa.ss_family,SOCK_STREAM,0)) < 0) return -1;
#if defined(HAVE_FCNTL)
flags = fcntl(fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
#elif defined(HAVE_IOCTL)
flags = 1;
ioctl(fd, FIONBIO, &flags);
#endif
j_inet_setport(&sa, port);
flag = connect(fd,(struct sockaddr*)&sa,j_inet_addrlen(&sa));
mio_debug(ZONE, "connect returned %d and %s", flag, strerror(errno));
if(flag == 0 && mio_fd(m,fd,app,arg) == fd) return fd;
if(flag == -1 && errno == EINPROGRESS && mio_fd(m,fd,app,arg) == fd)
{
mio_debug(ZONE, "connect processing non-blocking mode");
FD(m,fd).type = type_CONNECT;
MIO_SET_WRITE(m,fd);
return fd;
}
close(fd);
return -1;
}