#include <freeradius-devel/ident.h>
RCSID("$Id$")
#include <freeradius-devel/radiusd.h>
#include <freeradius-devel/modules.h>
#include "rlm_redis.h"
static const CONF_PARSER module_config[] = {
{ "num_connections", PW_TYPE_INTEGER,
offsetof(REDIS_INST, numconnections), NULL, "20"},
{ "hostname", PW_TYPE_STRING_PTR,
offsetof(REDIS_INST, hostname), NULL, "127.0.0.1"},
{ "port", PW_TYPE_INTEGER,
offsetof(REDIS_INST, port), NULL, "6379"},
{ "password", PW_TYPE_STRING_PTR,
offsetof(REDIS_INST, password), NULL, NULL},
{"connect_failure_retry_delay", PW_TYPE_INTEGER,
offsetof(REDIS_INST, connect_failure_retry_delay), NULL, "60"},
{"lifetime", PW_TYPE_INTEGER,
offsetof(REDIS_INST, lifetime), NULL, "0"},
{"max_queries", PW_TYPE_INTEGER,
offsetof(REDIS_INST, max_queries), NULL, "0"},
{ NULL, -1, 0, NULL, NULL}
};
static int redis_close_socket(REDIS_INST *inst, REDISSOCK *dissocket)
{
radlog(L_INFO, "rlm_redis (%s): Closing socket %d",
inst->xlat_name, dissocket->id);
if (dissocket->state == sockconnected) {
redisFree(dissocket->conn);
}
#ifdef HAVE_PTHREAD_H
pthread_mutex_destroy(&dissocket->mutex);
#endif
free(dissocket);
return 1;
}
static int connect_single_socket(REDIS_INST *inst, REDISSOCK *dissocket)
{
radlog(L_INFO, "rlm_redis (%s): Attempting to connect #%d",
inst->xlat_name, dissocket->id);
dissocket->conn = redisConnect(inst->hostname, inst->port);
if (!dissocket->conn->err) {
radlog(L_INFO, "rlm_redis (%s): Connected new DB handle, #%d",
inst->xlat_name, dissocket->id);
dissocket->state = sockconnected;
dissocket->queries = 0;
return 0;
}
if (inst->password) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "AUTH %s", inst->password);
dissocket->reply = redisCommand(dissocket->conn, buffer);
if (!dissocket->reply) {
radlog(L_ERR, "rlm_redis (%s): Failed to run AUTH",
inst->xlat_name);
redis_close_socket(inst, dissocket);
return -1;
}
switch (dissocket->reply->type) {
case REDIS_REPLY_STATUS:
if (strcmp(dissocket->reply->str, "OK") != 0) {
radlog(L_ERR, "rlm_redis (%s): Failed authentication: reply %s",
inst->xlat_name, dissocket->reply->str);
redis_close_socket(inst, dissocket);
return -1;
}
break;
default:
radlog(L_ERR, "rlm_redis (%s): Unexpected reply to AUTH",
inst->xlat_name);
redis_close_socket(inst, dissocket);
return -1;
}
}
radlog(L_CONS | L_ERR, "rlm_redis (%s): Failed to connect DB handle #%d",
inst->xlat_name, dissocket->id);
inst->connect_after = time(NULL) + inst->connect_failure_retry_delay;
dissocket->state = sockunconnected;
return -1;
}
static void redis_poolfree(REDIS_INST * inst)
{
REDISSOCK *cur;
REDISSOCK *next;
for (cur = inst->redispool; cur; cur = next) {
next = cur->next;
redis_close_socket(inst, cur);
}
inst->redispool = NULL;
}
static size_t redis_escape_func(char *out, size_t outlen, const char *in)
{
size_t len = 0;
while (*in) {
if ((*in <= 32) || (*in == '\\')) {
if (outlen <= 3) {
break;
}
snprintf(out, outlen, "=%02X", (unsigned char) in[0]);
in++;
out += 3;
outlen -= 3;
len += 3;
continue;
}
if (outlen <= 1) {
break;
}
*out = *in;
out++;
in++;
outlen--;
len++;
}
*out = '\0';
return len;
}
static int redis_xlat(void *instance, REQUEST *request,
char *fmt, char *out, size_t freespace,
UNUSED RADIUS_ESCAPE_STRING func)
{
REDIS_INST *inst = instance;
REDISSOCK *dissocket;
size_t ret = 0;
char *buffer_ptr;
char buffer[21];
char querystr[MAX_QUERY_LEN];
if (!radius_xlat(querystr, sizeof(querystr), fmt, request,
redis_escape_func)) {
radlog(L_ERR, "rlm_redis (%s): xlat failed.",
inst->xlat_name);
return 0;
}
if ((dissocket = redis_get_socket(inst)) == NULL) {
radlog(L_ERR, "rlm_redis (%s): redis_get_socket() failed",
inst->xlat_name);
return 0;
}
if (rlm_redis_query(dissocket, inst, querystr) < 0) {
rlm_redis_finish_query(dissocket);
redis_release_socket(inst,dissocket);
return 0;
}
switch (dissocket->reply->type) {
case REDIS_REPLY_INTEGER:
buffer_ptr = buffer;
snprintf(buffer_ptr, sizeof(buffer), "%lld",
dissocket->reply->integer);
ret = strlen(buffer_ptr);
break;
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
buffer_ptr = dissocket->reply->str;
ret = dissocket->reply->len;
break;
default:
buffer_ptr = NULL;
break;
}
if ((ret >= freespace) || (buffer_ptr == NULL)) {
RDEBUG("rlm_redis (%s): Can't write result, insufficient space or unsupported result\n",
inst->xlat_name);
rlm_redis_finish_query(dissocket);
redis_release_socket(inst,dissocket);
return 0;
}
strlcpy(out,buffer_ptr,freespace);
rlm_redis_finish_query(dissocket);
redis_release_socket(inst,dissocket);
return ret;
}
static int redis_detach(void *instance)
{
REDIS_INST *inst = instance;
redis_poolfree(inst);
if (inst->xlat_name) {
xlat_unregister(inst->xlat_name, (RAD_XLAT_FUNC)redis_xlat);
free(inst->xlat_name);
}
free(inst->xlat_name);
free(inst);
return 0;
}
static int redis_init_socketpool(REDIS_INST *inst)
{
int i, rcode;
int success = 0;
REDISSOCK *dissocket;
inst->connect_after = 0;
inst->redispool = NULL;
for (i = 0; i < inst->numconnections; i++) {
radlog(L_DBG, "rlm_redis (%s): starting %d",
inst->xlat_name, i);
dissocket = rad_malloc(sizeof (*dissocket));
if (dissocket == NULL) {
return -1;
}
memset(dissocket, 0, sizeof (*dissocket));
dissocket->conn = NULL;
dissocket->id = i;
dissocket->state = sockunconnected;
#ifdef HAVE_PTHREAD_H
rcode = pthread_mutex_init(&dissocket->mutex, NULL);
if (rcode != 0) {
free(dissocket);
radlog(L_ERR, "rlm_redis: Failed to init lock: %s",
strerror(errno));
return 0;
}
#endif
if (time(NULL) > inst->connect_after) {
if (connect_single_socket(inst, dissocket) == 0) {
success = 1;
}
}
dissocket->next = inst->redispool;
inst->redispool = dissocket;
}
inst->last_used = NULL;
if (!success) {
radlog(L_DBG, "rlm_redis (%s): Failed to connect to any redis server.",
inst->xlat_name);
}
return 1;
}
int rlm_redis_query(REDISSOCK *dissocket, REDIS_INST *inst, char *query)
{
if (!query || !*query) {
return -1;
}
DEBUG2("executing query %s", query);
dissocket->reply = redisCommand(dissocket->conn, query);
if (dissocket->reply == NULL) {
radlog(L_ERR, "rlm_redis: (%s) REDIS error: %s",
inst->xlat_name, dissocket->conn->errstr);
if (dissocket->state == sockconnected) {
redisFree(dissocket->conn);
dissocket->state = sockunconnected;
}
if (connect_single_socket(inst, dissocket) < 0) {
radlog(L_ERR, "rlm_redis (%s): reconnect failed, database down?",
inst->xlat_name);
return -1;
}
DEBUG2("executing query %s", query);
dissocket->reply = redisCommand(dissocket->conn, query);
if (dissocket->reply == NULL) {
radlog(L_ERR, "rlm_redis (%s): failed after re-connect",
inst->xlat_name);
return -1;
}
}
if (dissocket->reply->type == REDIS_REPLY_ERROR) {
radlog(L_ERR, "rlm_redis (%s): query failed, %s",
inst->xlat_name, query);
rlm_redis_finish_query(dissocket);
return -1;
}
return 0;
}
int rlm_redis_finish_query(REDISSOCK *dissocket)
{
if (dissocket == NULL) {
return -1;
}
if (dissocket->reply != NULL) {
freeReplyObject(dissocket->reply);
} else {
return -1;
}
return 0;
}
static time_t last_logged_failure = 0;
REDISSOCK *redis_get_socket(REDIS_INST *inst)
{
REDISSOCK *cur, *start;
int tried_to_connect = 0;
int unconnected = 0;
time_t now = time(NULL);
start = inst->last_used;
if (!start) start = inst->redispool;
cur = start;
while (cur) {
#ifdef HAVE_PTHREAD_H
if (pthread_mutex_trylock(&cur->mutex) != 0) {
goto next;
}
#endif
if (inst->lifetime && (cur->state == sockconnected) &&
((cur->connected + inst->lifetime) < now)) {
DEBUG2("Closing socket %d as its lifetime has been exceeded", cur->id);
redis_close_socket(inst, cur);
cur->state = sockunconnected;
goto reconnect;
}
if (inst->max_queries && (cur->state == sockconnected) &&
(cur->queries >= inst->max_queries)) {
DEBUG2("Closing socket %d as its max_queries has been exceeded", cur->id);
redis_close_socket(inst, cur);
cur->state = sockunconnected;
goto reconnect;
}
if ((cur->state == sockunconnected) && (now > inst->connect_after)) {
reconnect:
radlog(L_INFO, "rlm_redis (%s): Trying to (re)connect unconnected handle %d..", inst->xlat_name, cur->id);
tried_to_connect++;
connect_single_socket(inst, cur);
}
if (cur->state == sockunconnected) {
DEBUG("rlm_redis (%s): Ignoring unconnected handle %d..", inst->xlat_name, cur->id);
unconnected++;
#ifdef HAVE_PTHREAD_H
pthread_mutex_unlock(&cur->mutex);
#endif
goto next;
}
DEBUG("rlm_redis (%s): Reserving redis socket id: %d",
inst->xlat_name, cur->id);
if (unconnected != 0 || tried_to_connect != 0) {
DEBUG("rlm_redis (%s): got socket %d after skipping %d unconnected handles, tried to reconnect %d though",
inst->xlat_name, cur->id, unconnected, tried_to_connect);
}
inst->last_used = cur->next;
cur->queries++;
return cur;
next:
cur = cur->next;
if (!cur) {
cur = inst->redispool;
}
if (cur == start) {
break;
}
}
if (now <= last_logged_failure) return NULL;
last_logged_failure = now;
radlog(L_INFO, "rlm_redis (%s): There are no DB handles to use! skipped %d, tried to connect %d",
inst->xlat_name, unconnected, tried_to_connect);
return NULL;
}
int redis_release_socket(REDIS_INST *inst, REDISSOCK *dissocket)
{
#ifdef HAVE_PTHREAD_H
pthread_mutex_unlock(&dissocket->mutex);
#endif
radlog(L_DBG, "rlm_redis (%s): Released redis socket id: %d",
inst->xlat_name, dissocket->id);
return 0;
}
static int redis_instantiate(CONF_SECTION *conf, void **instance)
{
REDIS_INST *inst;
const char *xlat_name;
inst = rad_malloc(sizeof (REDIS_INST));
if (!inst) {
return -1;
}
memset(inst, 0, sizeof (*inst));
if (cf_section_parse(conf, inst, module_config) < 0) {
free(inst);
return -1;
}
xlat_name = cf_section_name2(conf);
if (!xlat_name)
xlat_name = cf_section_name1(conf);
inst->xlat_name = strdup(xlat_name);
xlat_register(inst->xlat_name, (RAD_XLAT_FUNC)redis_xlat, inst);
if (redis_init_socketpool(inst) < 0) {
redis_detach(inst);
return -1;
}
inst->redis_query = rlm_redis_query;
inst->redis_finish_query = rlm_redis_finish_query;
inst->redis_get_socket = redis_get_socket;
inst->redis_release_socket = redis_release_socket;
inst->redis_escape_func = redis_escape_func;
*instance = inst;
return 0;
}
module_t rlm_redis = {
RLM_MODULE_INIT,
"redis",
RLM_TYPE_THREAD_SAFE,
redis_instantiate,
redis_detach,
{
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
},
};