root/usr.sbin/unbound/cachedb/redis.c
/*
 * cachedb/redis.c - cachedb redis module
 *
 * Copyright (c) 2018, NLnet Labs. All rights reserved.
 *
 * This software is open source.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 
 * Redistributions of source code must retain the above copyright notice,
 * this list of conditions and the following disclaimer.
 * 
 * Redistributions in binary form must reproduce the above copyright notice,
 * this list of conditions and the following disclaimer in the documentation
 * and/or other materials provided with the distribution.
 * 
 * Neither the name of the NLNET LABS nor the names of its contributors may
 * be used to endorse or promote products derived from this software without
 * specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * \file
 *
 * This file contains a module that uses the redis database to cache
 * dns responses.
 */

#include "config.h"
#ifdef USE_CACHEDB
#include "cachedb/redis.h"
#include "cachedb/cachedb.h"
#include "util/alloc.h"
#include "util/config_file.h"
#include "util/locks.h"
#include "util/timeval_func.h"
#include "sldns/sbuffer.h"

#ifdef USE_REDIS
#include "hiredis/hiredis.h"

struct redis_moddata {
        /* thread-specific redis contexts */
        redisContext** ctxs;
        redisContext** replica_ctxs;
        /* number of ctx entries */
        int numctxs;
        /* server's IP address or host name */
        const char* server_host;
        const char* replica_server_host;
        /* server's TCP port */
        int server_port;
        int replica_server_port;
        /* server's unix path, or "", NULL if unused */
        const char* server_path;
        const char* replica_server_path;
        /* server's AUTH password, or "", NULL if unused */
        const char* server_password;
        const char* replica_server_password;
        /* timeout for commands */
        struct timeval command_timeout;
        struct timeval replica_command_timeout;
        /* timeout for connection setup */
        struct timeval connect_timeout;
        struct timeval replica_connect_timeout;
        /* the reconnect interval time. */
        struct timeval reconnect_interval;
        struct timeval replica_reconnect_interval;
        /* reconnect attempts, 0 if connected, counts up failed reconnects. */
        int reconnect_attempts;
        int replica_reconnect_attempts;
        /* Lock on reconnect_wait time. */
        lock_basic_type wait_lock;
        lock_basic_type replica_wait_lock;
        /* reconnect wait time, wait until it has passed before reconnect. */
        struct timeval reconnect_wait;
        struct timeval replica_reconnect_wait;
        /* the redis logical database to use */
        int logical_db;
        int replica_logical_db;
        /* if the SET with EX command is supported */
        int set_with_ex_available;
};

/** The limit on the number of redis connect attempts. After failure if
 * the number is exceeded, the reconnects are throttled by the wait time. */
#define REDIS_RECONNECT_ATTEMPT_LIMIT 3

static redisReply* redis_command(struct module_env*, struct cachedb_env*,
        const char*, const uint8_t*, size_t, int);

static void
moddata_clean(struct redis_moddata** moddata) {
        if(!moddata || !*moddata)
                return;
        if((*moddata)->ctxs) {
                int i;
                for(i = 0; i < (*moddata)->numctxs; i++) {
                        if((*moddata)->ctxs[i])
                                redisFree((*moddata)->ctxs[i]);
                }
                free((*moddata)->ctxs);
        }
        if((*moddata)->replica_ctxs) {
                int i;
                for(i = 0; i < (*moddata)->numctxs; i++) {
                        if((*moddata)->replica_ctxs[i])
                                redisFree((*moddata)->replica_ctxs[i]);
                }
                free((*moddata)->replica_ctxs);
        }
        lock_basic_destroy(&(*moddata)->wait_lock);
        lock_basic_destroy(&(*moddata)->replica_wait_lock);
        free(*moddata);
        *moddata = NULL;
}

static redisContext*
redis_connect(const char* host, int port, const char* path,
        const char* password, int logical_db,
        const struct timeval connect_timeout,
        const struct timeval command_timeout,
        const struct timeval* reconnect_interval,
        int* reconnect_attempts,
        struct timeval* reconnect_wait,
        lock_basic_type* wait_lock,
        struct timeval* now_tv,
        const char* infostr)
{
        struct timeval now_val;
        redisContext* ctx;

        /* See if the redis server is down, and reconnect has to wait. */
        if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
                /* Acquire lock to look at timeval, the integer has atomic
                 * integrity. */
                struct timeval wait_tv;
                if(now_tv) {
                        now_val = *now_tv;
                } else {
                        if(gettimeofday(&now_val, NULL) < 0)
                                log_err("redis: gettimeofday: %s",
                                        strerror(errno));
                }
                lock_basic_lock(wait_lock);
                wait_tv = *reconnect_wait;
                lock_basic_unlock(wait_lock);
                if(timeval_smaller(&now_val, &wait_tv)) {
                        verbose(VERB_ALGO, "redis %sdown, reconnect wait",
                                infostr);
                        return NULL;
                }
        }

        if(path && path[0]!=0) {
                ctx = redisConnectUnixWithTimeout(path, connect_timeout);
        } else {
                ctx = redisConnectWithTimeout(host, port, connect_timeout);
        }
        if(!ctx || ctx->err) {
                const char *errstr = "out of memory";
                if(ctx)
                        errstr = ctx->errstr;
                log_err("failed to connect to redis %sserver: %s", infostr, errstr);
                goto fail;
        }
        if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
                log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr);
                goto fail;
        }
        if(password && password[0]!=0) {
                redisReply* rep;
                rep = redisCommand(ctx, "AUTH %s", password);
                if(!rep || rep->type == REDIS_REPLY_ERROR) {
                        log_err("failed to authenticate %swith password", infostr);
                        freeReplyObject(rep);
                        goto fail;
                }
                freeReplyObject(rep);
        }
        if(logical_db > 0) {
                redisReply* rep;
                rep = redisCommand(ctx, "SELECT %d", logical_db);
                if(!rep || rep->type == REDIS_REPLY_ERROR) {
                        log_err("failed %sto set logical database (%d)",
                                infostr, logical_db);
                        freeReplyObject(rep);
                        goto fail;
                }
                freeReplyObject(rep);
        }
        *reconnect_attempts = 0;
        if(verbosity >= VERB_OPS) {
                char port_str[6+1];
                port_str[0] = ' ';
                (void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port);
                verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)",
                        infostr,
                        path&&path[0]!=0?path:host,
                        path&&path[0]!=0?"":port_str);
        }
        return ctx;

fail:
        if(ctx)
                redisFree(ctx);
        (*reconnect_attempts)++;
        if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) {
                /* Wait for the reconnect interval before trying again. */
                struct timeval tv;
                if(now_tv) {
                        now_val = *now_tv;
                } else {
                        if(gettimeofday(&now_val, NULL) < 0)
                                log_err("redis: gettimeofday: %s",
                                        strerror(errno));
                }
                tv = now_val;
                timeval_add(&tv, reconnect_interval);
                lock_basic_lock(wait_lock);
                *reconnect_wait = tv;
                lock_basic_unlock(wait_lock);
                verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d",
                        infostr, (int)tv.tv_sec, (int)tv.tv_usec);
        }
        return NULL;
}

static void
set_timeout(struct timeval* timeout, int value, int explicit_value)
{
        int v = explicit_value != 0 ? explicit_value : value;
        timeout->tv_sec = v / 1000;
        timeout->tv_usec = (v % 1000) * 1000;
}

static int
redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
{
        int i;
        struct redis_moddata* moddata = NULL;

        verbose(VERB_OPS, "Redis initialization");

        moddata = calloc(1, sizeof(struct redis_moddata));
        if(!moddata) {
                log_err("out of memory");
                goto fail;
        }
        lock_basic_init(&moddata->wait_lock);
        lock_protect(&moddata->wait_lock, &moddata->reconnect_wait,
                sizeof(moddata->reconnect_wait));
        lock_basic_init(&moddata->replica_wait_lock);
        lock_protect(&moddata->replica_wait_lock,
                &moddata->replica_reconnect_wait,
                sizeof(moddata->replica_reconnect_wait));
        moddata->numctxs = env->cfg->num_threads;
        /* note: server_host and similar string configuration options are
         * shallow references to configured strings; we don't have to free them
         * in this module. */
        moddata->server_host = env->cfg->redis_server_host;
        moddata->replica_server_host = env->cfg->redis_replica_server_host;

        moddata->server_port = env->cfg->redis_server_port;
        moddata->replica_server_port = env->cfg->redis_replica_server_port;

        moddata->server_path = env->cfg->redis_server_path;
        moddata->replica_server_path = env->cfg->redis_replica_server_path;

        moddata->server_password = env->cfg->redis_server_password;
        moddata->replica_server_password = env->cfg->redis_replica_server_password;

        set_timeout(&moddata->command_timeout,
                env->cfg->redis_timeout,
                env->cfg->redis_command_timeout);
        set_timeout(&moddata->replica_command_timeout,
                env->cfg->redis_replica_timeout,
                env->cfg->redis_replica_command_timeout);
        set_timeout(&moddata->connect_timeout,
                env->cfg->redis_timeout,
                env->cfg->redis_connect_timeout);
        set_timeout(&moddata->replica_connect_timeout,
                env->cfg->redis_replica_timeout,
                env->cfg->redis_replica_connect_timeout);
        set_timeout(&moddata->reconnect_interval, 1000, 0);
        set_timeout(&moddata->replica_reconnect_interval, 1000, 0);

        moddata->logical_db = env->cfg->redis_logical_db;
        moddata->replica_logical_db = env->cfg->redis_replica_logical_db;

        moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
        if(!moddata->ctxs) {
                log_err("out of memory");
                goto fail;
        }
        if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
                || (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
                /* There is a replica configured, allocate ctxs */
                moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
                if(!moddata->replica_ctxs) {
                        log_err("out of memory");
                        goto fail;
                }
        }
        for(i = 0; i < moddata->numctxs; i++) {
                redisContext* ctx = redis_connect(
                        moddata->server_host,
                        moddata->server_port,
                        moddata->server_path,
                        moddata->server_password,
                        moddata->logical_db,
                        moddata->connect_timeout,
                        moddata->command_timeout,
                        &moddata->reconnect_interval,
                        &moddata->reconnect_attempts,
                        &moddata->reconnect_wait,
                        &moddata->wait_lock,
                        env->now_tv,
                        "");
                if(!ctx) {
                        log_err("redis_init: failed to init redis "
                                "(for thread %d)", i);
                        /* And continue, the context can be established
                         * later, just like after a disconnect. */
                }
                moddata->ctxs[i] = ctx;
        }
        if(moddata->replica_ctxs) {
                for(i = 0; i < moddata->numctxs; i++) {
                        redisContext* ctx = redis_connect(
                                moddata->replica_server_host,
                                moddata->replica_server_port,
                                moddata->replica_server_path,
                                moddata->replica_server_password,
                                moddata->replica_logical_db,
                                moddata->replica_connect_timeout,
                                moddata->replica_command_timeout,
                                &moddata->replica_reconnect_interval,
                                &moddata->replica_reconnect_attempts,
                                &moddata->replica_reconnect_wait,
                                &moddata->replica_wait_lock,
                                env->now_tv,
                                "replica ");
                        if(!ctx) {
                                log_err("redis_init: failed to init redis "
                                        "replica (for thread %d)", i);
                                /* And continue, the context can be established
                                * later, just like after a disconnect. */
                        }
                        moddata->replica_ctxs[i] = ctx;
                }
        }
        cachedb_env->backend_data = moddata;
        if(env->cfg->redis_expire_records &&
                moddata->ctxs[env->alloc->thread_num] != NULL) {
                redisReply* rep = NULL;
                int redis_reply_type = 0;
                /** check if set with ex command is supported */
                rep = redis_command(env, cachedb_env,
                        "SET __UNBOUND_REDIS_CHECK__ none EX 1", NULL, 0, 1);
                if(!rep) {
                        /** init failed, no response from redis server*/
                        goto set_with_ex_fail;
                }
                redis_reply_type = rep->type;
                freeReplyObject(rep);
                switch(redis_reply_type) {
                case REDIS_REPLY_STATUS:
                        break;
                default:
                        /** init failed, set_with_ex command not supported */
                        goto set_with_ex_fail;
                }
                moddata->set_with_ex_available = 1;
        }
        return 1;

set_with_ex_fail:
        log_err("redis_init: failure during redis_init, the "
                "redis-expire-records option requires the SET with EX command "
                "(redis >= 2.6.12)");
        return 1;
fail:
        moddata_clean(&moddata);
        return 0;
}

static void
redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
{
        struct redis_moddata* moddata = (struct redis_moddata*)
                cachedb_env->backend_data;
        (void)env;

        verbose(VERB_OPS, "Redis deinitialization");
        moddata_clean(&moddata);
}

/*
 * Send a redis command and get a reply.  Unified so that it can be used for
 * both SET and GET.  If 'data' is non-NULL the command is supposed to be
 * SET and GET otherwise, but the implementation of this function is agnostic
 * about the semantics (except for logging): 'command', 'data', and 'data_len'
 * are opaquely passed to redisCommand().
 * This function first checks whether a connection with a redis server has
 * been established; if not it tries to set up a new one.
 * It returns redisReply returned from redisCommand() or NULL if some low
 * level error happens.  The caller is responsible to check the return value,
 * if it's non-NULL, it has to free it with freeReplyObject().
 */
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
        const char* command, const uint8_t* data, size_t data_len, int write)
{
        redisContext* ctx, **ctx_selector;
        redisReply* rep;
        struct redis_moddata* d = (struct redis_moddata*)
                cachedb_env->backend_data;

        /* We assume env->alloc->thread_num is a unique ID for each thread
         * in [0, num-of-threads).  We could treat it as an error condition
         * if the assumption didn't hold, but it seems to be a fundamental
         * assumption throughout the unbound architecture, so we simply assert
         * it. */
        log_assert(env->alloc->thread_num < d->numctxs);

        ctx_selector = !write && d->replica_ctxs
                ?d->replica_ctxs
                :d->ctxs;
        ctx = ctx_selector[env->alloc->thread_num];

        /* If we've not established a connection to the server or we've closed
         * it on a failure, try to re-establish a new one.   Failures will be
         * logged in redis_connect(). */
        if(!ctx) {
                if(!write && d->replica_ctxs) {
                        ctx = redis_connect(
                                d->replica_server_host,
                                d->replica_server_port,
                                d->replica_server_path,
                                d->replica_server_password,
                                d->replica_logical_db,
                                d->replica_connect_timeout,
                                d->replica_command_timeout,
                                &d->replica_reconnect_interval,
                                &d->replica_reconnect_attempts,
                                &d->replica_reconnect_wait,
                                &d->replica_wait_lock,
                                env->now_tv,
                                "replica ");
                } else {
                        ctx = redis_connect(
                                d->server_host,
                                d->server_port,
                                d->server_path,
                                d->server_password,
                                d->logical_db,
                                d->connect_timeout,
                                d->command_timeout,
                                &d->reconnect_interval,
                                &d->reconnect_attempts,
                                &d->reconnect_wait,
                                &d->wait_lock,
                                env->now_tv,
                                "");
                }
                ctx_selector[env->alloc->thread_num] = ctx;
        }
        if(!ctx) return NULL;

        /* Send the command and get a reply, synchronously. */
        rep = (redisReply*)redisCommand(ctx, command, data, data_len);
        if(!rep) {
                /* Once an error as a NULL-reply is returned the context cannot
                 * be reused and we'll need to set up a new connection. */
                log_err("redis_command: failed to receive a reply, "
                        "closing connection: %s", ctx->errstr);
                redisFree(ctx);
                ctx_selector[env->alloc->thread_num] = NULL;
                return NULL;
        }

        /* Check error in reply to unify logging in that case.
         * The caller may perform context-dependent checks and logging. */
        if(rep->type == REDIS_REPLY_ERROR)
                log_err("redis: %s resulted in an error: %s",
                        data ? "set" : "get", rep->str);

        return rep;
}

static int
redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
        char* key, struct sldns_buffer* result_buffer)
{
        redisReply* rep;
        /* Supported commands:
         * - "GET " + key
         */
#define REDIS_LOOKUP_MAX_BUF_LEN                        \
        4                               /* "GET " */    \
        +(CACHEDB_HASHSIZE/8)*2         /* key hash */  \
        + 1                             /* \0 */
        char cmdbuf[REDIS_LOOKUP_MAX_BUF_LEN];
        int n;
        int ret = 0;

        verbose(VERB_ALGO, "redis_lookup of %s", key);

        n = snprintf(cmdbuf, sizeof(cmdbuf), "GET %s", key);
        if(n < 0 || n >= (int)sizeof(cmdbuf)) {
                log_err("redis_lookup: unexpected failure to build command");
                return 0;
        }

        rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
        if(!rep)
                return 0;
        switch(rep->type) {
        case REDIS_REPLY_NIL:
                verbose(VERB_ALGO, "redis_lookup: no data cached");
                break;
        case REDIS_REPLY_STRING:
                verbose(VERB_ALGO, "redis_lookup found %d bytes",
                        (int)rep->len);
                if((size_t)rep->len > sldns_buffer_capacity(result_buffer)) {
                        log_err("redis_lookup: replied data too long: %lu",
                                (size_t)rep->len);
                        break;
                }
                sldns_buffer_clear(result_buffer);
                sldns_buffer_write(result_buffer, rep->str, rep->len);
                sldns_buffer_flip(result_buffer);
                ret = 1;
                break;
        case REDIS_REPLY_ERROR:
                break;          /* already logged */
        default:
                log_err("redis_lookup: unexpected type of reply for (%d)",
                        rep->type);
                break;
        }
        freeReplyObject(rep);
        return ret;
}

static void
redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
        char* key, uint8_t* data, size_t data_len, time_t ttl)
{
        redisReply* rep;
        int n;
        struct redis_moddata* moddata = (struct redis_moddata*)
                cachedb_env->backend_data;
        int set_ttl = (moddata->set_with_ex_available &&
                env->cfg->redis_expire_records &&
                (!env->cfg->serve_expired || env->cfg->serve_expired_ttl > 0));
        /* Supported commands:
         * - "SET " + key + " %b"
         * - "SET " + key + " %b EX " + ttl
         *   older redis 2.0.0 was "SETEX " + key + " " + ttl + " %b"
         * - "EXPIRE " + key + " 0"
         */
#define REDIS_STORE_MAX_BUF_LEN                         \
        7                       /* "EXPIRE " */         \
        +(CACHEDB_HASHSIZE/8)*2 /* key hash */          \
        + 7                     /* " %b EX " */         \
        + 20                    /* ttl (uint64_t) */    \
        + 1                     /* \0 */
        char cmdbuf[REDIS_STORE_MAX_BUF_LEN];

        if (!set_ttl) {
                verbose(VERB_ALGO, "redis_store %s (%d bytes)", key, (int)data_len);
                /* build command to set to a binary safe string */
                n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b", key);
        } else if(ttl == 0) {
                /* use the EXPIRE command, SET with EX 0 is an invalid time. */
                /* Replies with REDIS_REPLY_INTEGER of 1. */
                verbose(VERB_ALGO, "redis_store expire %s (%d bytes)",
                        key, (int)data_len);
                n = snprintf(cmdbuf, sizeof(cmdbuf), "EXPIRE %s 0", key);
                data = NULL;
                data_len = 0;
        } else {
                /* add expired ttl time to redis ttl to avoid premature eviction of key */
                ttl += env->cfg->serve_expired_ttl;
                verbose(VERB_ALGO, "redis_store %s (%d bytes) with ttl %u",
                        key, (int)data_len, (unsigned)(uint32_t)ttl);
                /* build command to set to a binary safe string */
                n = snprintf(cmdbuf, sizeof(cmdbuf), "SET %s %%b EX %u", key,
                        (unsigned)(uint32_t)ttl);
        }


        if(n < 0 || n >= (int)sizeof(cmdbuf)) {
                log_err("redis_store: unexpected failure to build command");
                return;
        }

        rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
        if(rep) {
                verbose(VERB_ALGO, "redis_store set completed");
                if(rep->type != REDIS_REPLY_STATUS &&
                        rep->type != REDIS_REPLY_ERROR &&
                        rep->type != REDIS_REPLY_INTEGER) {
                        log_err("redis_store: unexpected type of reply (%d)",
                                rep->type);
                }
                freeReplyObject(rep);
        }
}

struct cachedb_backend redis_backend = { "redis",
        redis_init, redis_deinit, redis_lookup, redis_store
};
#endif  /* USE_REDIS */
#endif /* USE_CACHEDB */