#include "config.h"
#ifdef USE_CACHEDB
#include "cachedb/redis.h"
#include "cachedb/cachedb.h"
#include "util/alloc.h"
#include "util/config_file.h"
#include "util/locks.h"
#include "util/timeval_func.h"
#include "sldns/sbuffer.h"
#ifdef USE_REDIS
#include "hiredis/hiredis.h"
struct redis_moddata {
redisContext** ctxs;
redisContext** replica_ctxs;
int numctxs;
const char* server_host;
const char* replica_server_host;
int server_port;
int replica_server_port;
const char* server_path;
const char* replica_server_path;
const char* server_password;
const char* replica_server_password;
struct timeval command_timeout;
struct timeval replica_command_timeout;
struct timeval connect_timeout;
struct timeval replica_connect_timeout;
struct timeval reconnect_interval;
struct timeval replica_reconnect_interval;
int reconnect_attempts;
int replica_reconnect_attempts;
lock_basic_type wait_lock;
lock_basic_type replica_wait_lock;
struct timeval reconnect_wait;
struct timeval replica_reconnect_wait;
int logical_db;
int replica_logical_db;
int set_with_ex_available;
};
#define REDIS_RECONNECT_ATTEMPT_LIMIT 3
static redisReply* redis_command(struct module_env*, struct cachedb_env*,
const char*, const uint8_t*, size_t, int);
static void
moddata_clean(struct redis_moddata** moddata) {
if(!moddata || !*moddata)
return;
if((*moddata)->ctxs) {
int i;
for(i = 0; i < (*moddata)->numctxs; i++) {
if((*moddata)->ctxs[i])
redisFree((*moddata)->ctxs[i]);
}
free((*moddata)->ctxs);
}
if((*moddata)->replica_ctxs) {
int i;
for(i = 0; i < (*moddata)->numctxs; i++) {
if((*moddata)->replica_ctxs[i])
redisFree((*moddata)->replica_ctxs[i]);
}
free((*moddata)->replica_ctxs);
}
lock_basic_destroy(&(*moddata)->wait_lock);
lock_basic_destroy(&(*moddata)->replica_wait_lock);
free(*moddata);
*moddata = NULL;
}
static redisContext*
redis_connect(const char* host, int port, const char* path,
const char* password, int logical_db,
const struct timeval connect_timeout,
const struct timeval command_timeout,
const struct timeval* reconnect_interval,
int* reconnect_attempts,
struct timeval* reconnect_wait,
lock_basic_type* wait_lock,
struct timeval* now_tv,
const char* infostr)
{
struct timeval now_val;
redisContext* ctx;
if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
struct timeval wait_tv;
if(now_tv) {
now_val = *now_tv;
} else {
if(gettimeofday(&now_val, NULL) < 0)
log_err("redis: gettimeofday: %s",
strerror(errno));
}
lock_basic_lock(wait_lock);
wait_tv = *reconnect_wait;
lock_basic_unlock(wait_lock);
if(timeval_smaller(&now_val, &wait_tv)) {
verbose(VERB_ALGO, "redis %sdown, reconnect wait",
infostr);
return NULL;
}
}
if(path && path[0]!=0) {
ctx = redisConnectUnixWithTimeout(path, connect_timeout);
} else {
ctx = redisConnectWithTimeout(host, port, connect_timeout);
}
if(!ctx || ctx->err) {
const char *errstr = "out of memory";
if(ctx)
errstr = ctx->errstr;
log_err("failed to connect to redis %sserver: %s", infostr, errstr);
goto fail;
}
if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr);
goto fail;
}
if(password && password[0]!=0) {
redisReply* rep;
rep = redisCommand(ctx, "AUTH %s", password);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to authenticate %swith password", infostr);
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
if(logical_db > 0) {
redisReply* rep;
rep = redisCommand(ctx, "SELECT %d", logical_db);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed %sto set logical database (%d)",
infostr, logical_db);
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
*reconnect_attempts = 0;
if(verbosity >= VERB_OPS) {
char port_str[6+1];
port_str[0] = ' ';
(void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)",
infostr,
path&&path[0]!=0?path:host,
path&&path[0]!=0?"":port_str);
}
return ctx;
fail:
if(ctx)
redisFree(ctx);
(*reconnect_attempts)++;
if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
struct timeval tv;
if(now_tv) {
now_val = *now_tv;
} else {
if(gettimeofday(&now_val, NULL) < 0)
log_err("redis: gettimeofday: %s",
strerror(errno));
}
tv = now_val;
timeval_add(&tv, reconnect_interval);
lock_basic_lock(wait_lock);
*reconnect_wait = tv;
lock_basic_unlock(wait_lock);
verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d",
infostr, (int)tv.tv_sec, (int)tv.tv_usec);
}
return NULL;
}
static void
set_timeout(struct timeval* timeout, int value, int explicit_value)
{
int v = explicit_value != 0 ? explicit_value : value;
timeout->tv_sec = v / 1000;
timeout->tv_usec = (v % 1000) * 1000;
}
static int
redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
{
int i;
struct redis_moddata* moddata = NULL;
verbose(VERB_OPS, "Redis initialization");
moddata = calloc(1, sizeof(struct redis_moddata));
if(!moddata) {
log_err("out of memory");
goto fail;
}
lock_basic_init(&moddata->wait_lock);
lock_protect(&moddata->wait_lock, &moddata->reconnect_wait,
sizeof(moddata->reconnect_wait));
lock_basic_init(&moddata->replica_wait_lock);
lock_protect(&moddata->replica_wait_lock,
&moddata->replica_reconnect_wait,
sizeof(moddata->replica_reconnect_wait));
moddata->numctxs = env->cfg->num_threads;
moddata->server_host = env->cfg->redis_server_host;
moddata->replica_server_host = env->cfg->redis_replica_server_host;
moddata->server_port = env->cfg->redis_server_port;
moddata->replica_server_port = env->cfg->redis_replica_server_port;
moddata->server_path = env->cfg->redis_server_path;
moddata->replica_server_path = env->cfg->redis_replica_server_path;
moddata->server_password = env->cfg->redis_server_password;
moddata->replica_server_password = env->cfg->redis_replica_server_password;
set_timeout(&moddata->command_timeout,
env->cfg->redis_timeout,
env->cfg->redis_command_timeout);
set_timeout(&moddata->replica_command_timeout,
env->cfg->redis_replica_timeout,
env->cfg->redis_replica_command_timeout);
set_timeout(&moddata->connect_timeout,
env->cfg->redis_timeout,
env->cfg->redis_connect_timeout);
set_timeout(&moddata->replica_connect_timeout,
env->cfg->redis_replica_timeout,
env->cfg->redis_replica_connect_timeout);
set_timeout(&moddata->reconnect_interval, 1000, 0);
set_timeout(&moddata->replica_reconnect_interval, 1000, 0);
moddata->logical_db = env->cfg->redis_logical_db;
moddata->replica_logical_db = env->cfg->redis_replica_logical_db;
moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->ctxs) {
log_err("out of memory");
goto fail;
}
if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
|| (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->replica_ctxs) {
log_err("out of memory");
goto fail;
}
}
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(
moddata->server_host,
moddata->server_port,
moddata->server_path,
moddata->server_password,
moddata->logical_db,
moddata->connect_timeout,
moddata->command_timeout,
&moddata->reconnect_interval,
&moddata->reconnect_attempts,
&moddata->reconnect_wait,
&moddata->wait_lock,
env->now_tv,
"");
if(!ctx) {
log_err("redis_init: failed to init redis "
"(for thread %d)", i);
}
moddata->ctxs[i] = ctx;
}
if(moddata->replica_ctxs) {
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(
moddata->replica_server_host,
moddata->replica_server_port,
moddata->replica_server_path,
moddata->replica_server_password,
moddata->replica_logical_db,
moddata->replica_connect_timeout,
moddata->replica_command_timeout,
&moddata->replica_reconnect_interval,
&moddata->replica_reconnect_attempts,
&moddata->replica_reconnect_wait,
&moddata->replica_wait_lock,
env->now_tv,
"replica ");
if(!ctx) {
log_err("redis_init: failed to init redis "
"replica (for thread %d)", i);
}
moddata->replica_ctxs[i] = ctx;
}
}
cachedb_env->backend_data = moddata;
if(env->cfg->redis_expire_records &&
moddata->ctxs[env->alloc->thread_num] != NULL) {
redisReply* rep = NULL;
int redis_reply_type = 0;
rep = redis_command(env, cachedb_env,
"SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1);
if(!rep) {
goto set_with_ex_fail;
}
redis_reply_type = rep->type;
freeReplyObject(rep);
switch(redis_reply_type) {
case REDIS_REPLY_STATUS:
break;
default:
goto set_with_ex_fail;
}
moddata->set_with_ex_available = 1;
}
return 1;
set_with_ex_fail:
log_err("redis_init: failure during redis_init, the "
"redis-expire-records option requires the SET with EX command "
"(redis >= 2.6.12)");
return 1;
fail:
moddata_clean(&moddata);
return 0;
}
static void
redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
{
struct redis_moddata* moddata = (struct redis_moddata*)
cachedb_env->backend_data;
(void)env;
verbose(VERB_OPS, "Redis deinitialization");
moddata_clean(&moddata);
}
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
const char* command, const uint8_t* data, size_t data_len, int write)
{
redisContext* ctx, **ctx_selector;
redisReply* rep;
struct redis_moddata* d = (struct redis_moddata*)
cachedb_env->backend_data;
log_assert(env->alloc->thread_num < d->numctxs);
ctx_selector = !write && d->replica_ctxs
?d->replica_ctxs
:d->ctxs;
ctx = ctx_selector[env->alloc->thread_num];
if(!ctx) {
if(!write && d->replica_ctxs) {
ctx = redis_connect(
d->replica_server_host,
d->replica_server_port,
d->replica_server_path,
d->replica_server_password,
d->replica_logical_db,
d->replica_connect_timeout,
d->replica_command_timeout,
&d->replica_reconnect_interval,
&d->replica_reconnect_attempts,
&d->replica_reconnect_wait,
&d->replica_wait_lock,
env->now_tv,
"replica ");
} else {
ctx = redis_connect(
d->server_host,
d->server_port,
d->server_path,
d->server_password,
d->logical_db,
d->connect_timeout,
d->command_timeout,
&d->reconnect_interval,
&d->reconnect_attempts,
&d->reconnect_wait,
&d->wait_lock,
env->now_tv,
"");
}
ctx_selector[env->alloc->thread_num] = ctx;
}
if(!ctx) return NULL;
rep = (redisReply*)redisCommand(ctx, command, data, data_len);
if(!rep) {
log_err("redis_command: failed to receive a reply, "
"closing connection: %s", ctx->errstr);
redisFree(ctx);
ctx_selector[env->alloc->thread_num] = NULL;
return NULL;
}
if(rep->type == REDIS_REPLY_ERROR)
log_err("redis: %s resulted in an error: %s",
data ? "set" : "get", rep->str);
return rep;
}
static int
redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
char* key, struct sldns_buffer* result_buffer)
{
redisReply* rep;
#define REDIS_LOOKUP_MAX_BUF_LEN \
4 \
+(CACHEDB_HASHSIZE/8)*2 \
+ 1
char cmdbuf[REDIS_LOOKUP_MAX_BUF_LEN];
int n;
int ret = 0;
verbose(VERB_ALGO, "redis_lookup of %s", key);
n = snprintf(cmdbuf, sizeof(cmdbuf), "GET %s", key);
if(n < 0 || n >= (int)sizeof(cmdbuf)) {
log_err("redis_lookup: unexpected failure to build command");
return 0;
}
rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
if(!rep)
return 0;
switch(rep->type) {
case REDIS_REPLY_NIL:
verbose(VERB_ALGO, "redis_lookup: no data cached");
break;
case REDIS_REPLY_STRING:
verbose(VERB_ALGO, "redis_lookup found %d bytes",
(int)rep->len);
if((size_t)rep->len > sldns_buffer_capacity(result_buffer)) {
log_err("redis_lookup: replied data too long: %lu",
(size_t)rep->len);
break;
}
sldns_buffer_clear(result_buffer);
sldns_buffer_write(result_buffer, rep->str, rep->len);
sldns_buffer_flip(result_buffer);
ret = 1;
break;
case REDIS_REPLY_ERROR:
break;
default:
log_err("redis_lookup: unexpected type of reply for (%d)",
rep->type);
break;
}
freeReplyObject(rep);
return ret;
}
static void
redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
char* key, uint8_t* data, size_t data_len, time_t ttl)
{
redisReply* rep;
int n;
struct redis_moddata* moddata = (struct redis_moddata*)
cachedb_env->backend_data;
int set_ttl = (moddata->set_with_ex_available &&
env->cfg->redis_expire_records &&
(!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0));
#define REDIS_STORE_MAX_BUF_LEN \
7 \
+(CACHEDB_HASHSIZE/8)*2 \
+ 7 \
+ 20 \
+ 1
char cmdbuf[REDIS_STORE_MAX_BUF_LEN];
if (!set_ttl) {
verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len);
n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key);
} else if(ttl == 0) {
verbose(VERB_ALGO, "redis_store expire %s (%d bytes)",
key, (int)data_len);
n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key);
data = NULL;
data_len = 0;
} else {
ttl += env->cfg->serve_expired_ttl;
verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u",
key, (int)data_len, (unsigned)(uint32_t)ttl);
n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key,
(unsigned)(uint32_t)ttl);
}
if(n < 0 || n >= (int)sizeof(cmdbuf)) {
log_err("redis_store: unexpected failure to build command");
return;
}
rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
if(rep) {
verbose(VERB_ALGO, "redis_store set completed");
if(rep->type != REDIS_REPLY_STATUS &&
rep->type != REDIS_REPLY_ERROR &&
rep->type != REDIS_REPLY_INTEGER) {
log_err("redis_store: unexpected type of reply (%d)",
rep->type);
}
freeReplyObject(rep);
}
}
struct cachedb_backend redis_backend = { "redis",
redis_init, redis_deinit, redis_lookup, redis_store
};
#endif
#endif