root/usr.sbin/nscd/mp_ws_query.c
/*-
 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 */

#include <sys/types.h>
#include <sys/event.h>
#include <sys/socket.h>
#include <sys/time.h>

#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "cachelib.h"
#include "config.h"
#include "debug.h"
#include "log.h"
#include "query.h"
#include "mp_ws_query.h"
#include "singletons.h"

static int on_mp_write_session_abandon_notification(struct query_state *);
static int on_mp_write_session_close_notification(struct query_state *);
static void on_mp_write_session_destroy(struct query_state *);
static int on_mp_write_session_mapper(struct query_state *);
/* int on_mp_write_session_request_read1(struct query_state *); */
static int on_mp_write_session_request_read2(struct query_state *);
static int on_mp_write_session_request_process(struct query_state *);
static int on_mp_write_session_response_write1(struct query_state *);
static int on_mp_write_session_write_request_read1(struct query_state *);
static int on_mp_write_session_write_request_read2(struct query_state *);
static int on_mp_write_session_write_request_process(struct query_state *);
static int on_mp_write_session_write_response_write1(struct query_state *);

/*
 * This function is used as the query_state's destroy_func to make the
 * proper cleanup in case of errors.
 */
static void
on_mp_write_session_destroy(struct query_state *qstate)
{

        TRACE_IN(on_mp_write_session_destroy);
        finalize_comm_element(&qstate->request);
        finalize_comm_element(&qstate->response);

        if (qstate->mdata != NULL) {
                configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
                abandon_cache_mp_write_session(
                        (cache_mp_write_session)qstate->mdata);
                configuration_unlock_entry(qstate->config_entry,
                        CELT_MULTIPART);
        }
        TRACE_OUT(on_mp_write_session_destroy);
}

/*
 * The functions below are used to process multipart write session initiation
 * requests.
 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
 *   read the request itself
 * - on_mp_write_session_request_process processes it
 * - on_mp_write_session_response_write1 sends the response
 */
int
on_mp_write_session_request_read1(struct query_state *qstate)
{
        struct cache_mp_write_session_request   *c_mp_ws_request;
        ssize_t result;

        TRACE_IN(on_mp_write_session_request_read1);
        if (qstate->kevent_watermark == 0)
                qstate->kevent_watermark = sizeof(size_t);
        else {
                init_comm_element(&qstate->request,
                        CET_MP_WRITE_SESSION_REQUEST);
                c_mp_ws_request = get_cache_mp_write_session_request(
                        &qstate->request);

                result = qstate->read_func(qstate,
                        &c_mp_ws_request->entry_length, sizeof(size_t));

                if (result != sizeof(size_t)) {
                        LOG_ERR_3("on_mp_write_session_request_read1",
                                "read failed");
                        TRACE_OUT(on_mp_write_session_request_read1);
                        return (-1);
                }

                if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
                        LOG_ERR_3("on_mp_write_session_request_read1",
                                "invalid entry_length value");
                        TRACE_OUT(on_mp_write_session_request_read1);
                        return (-1);
                }

                c_mp_ws_request->entry = calloc(1,
                        c_mp_ws_request->entry_length + 1);
                assert(c_mp_ws_request->entry != NULL);

                qstate->kevent_watermark = c_mp_ws_request->entry_length;
                qstate->process_func = on_mp_write_session_request_read2;
        }
        TRACE_OUT(on_mp_write_session_request_read1);
        return (0);
}

static int
on_mp_write_session_request_read2(struct query_state *qstate)
{
        struct cache_mp_write_session_request   *c_mp_ws_request;
        ssize_t result;

        TRACE_IN(on_mp_write_session_request_read2);
        c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);

        result = qstate->read_func(qstate, c_mp_ws_request->entry,
                c_mp_ws_request->entry_length);

        if (result < 0 || (size_t)result != qstate->kevent_watermark) {
                LOG_ERR_3("on_mp_write_session_request_read2",
                        "read failed");
                TRACE_OUT(on_mp_write_session_request_read2);
                return (-1);
        }

        qstate->kevent_watermark = 0;
        qstate->process_func = on_mp_write_session_request_process;

        TRACE_OUT(on_mp_write_session_request_read2);
        return (0);
}

static int
on_mp_write_session_request_process(struct query_state *qstate)
{
        struct cache_mp_write_session_request   *c_mp_ws_request;
        struct cache_mp_write_session_response  *c_mp_ws_response;
        cache_mp_write_session  ws;
        cache_entry     c_entry;
        char    *dec_cache_entry_name;

        TRACE_IN(on_mp_write_session_request_process);
        init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
        c_mp_ws_response = get_cache_mp_write_session_response(
                &qstate->response);
        c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);

        qstate->config_entry = configuration_find_entry(
                s_configuration, c_mp_ws_request->entry);
        if (qstate->config_entry == NULL) {
                c_mp_ws_response->error_code = ENOENT;

                LOG_ERR_2("write_session_request",
                        "can't find configuration entry '%s'. "
                        "aborting request", c_mp_ws_request->entry);
                goto fin;
        }

        if (qstate->config_entry->enabled == 0) {
                c_mp_ws_response->error_code = EACCES;

                LOG_ERR_2("write_session_request",
                        "configuration entry '%s' is disabled",
                        c_mp_ws_request->entry);
                goto fin;
        }

        if (qstate->config_entry->perform_actual_lookups != 0) {
                c_mp_ws_response->error_code = EOPNOTSUPP;

                LOG_ERR_2("write_session_request",
                        "entry '%s' performs lookups by itself: "
                        "can't write to it", c_mp_ws_request->entry);
                goto fin;
        } else {
#ifdef NS_NSCD_EID_CHECKING
                if (check_query_eids(qstate) != 0) {
                        c_mp_ws_response->error_code = EPERM;
                        goto fin;
                }
#endif
        }

        /*
         * All multipart entries are separated by their name decorations.
         * For one configuration entry there will be a lot of multipart
         * cache entries - each with its own decorated name.
         */
        asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
                qstate->config_entry->mp_cache_params.cep.entry_name);
        assert(dec_cache_entry_name != NULL);

        configuration_lock_rdlock(s_configuration);
        c_entry = find_cache_entry(s_cache,
                dec_cache_entry_name);
        configuration_unlock(s_configuration);

        if (c_entry == INVALID_CACHE_ENTRY)
                c_entry = register_new_mp_cache_entry(qstate,
                        dec_cache_entry_name);

        free(dec_cache_entry_name);

        assert(c_entry != NULL);
        configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
        ws = open_cache_mp_write_session(c_entry);
        if (ws == INVALID_CACHE_MP_WRITE_SESSION)
                c_mp_ws_response->error_code = -1;
        else {
                qstate->mdata = ws;
                qstate->destroy_func = on_mp_write_session_destroy;

                if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
                    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
                        memcpy(&qstate->timeout,
                                &qstate->config_entry->mp_query_timeout,
                                sizeof(struct timeval));
        }
        configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);

fin:
        qstate->process_func = on_mp_write_session_response_write1;
        qstate->kevent_watermark = sizeof(int);
        qstate->kevent_filter = EVFILT_WRITE;

        TRACE_OUT(on_mp_write_session_request_process);
        return (0);
}

static int
on_mp_write_session_response_write1(struct query_state *qstate)
{
        struct cache_mp_write_session_response  *c_mp_ws_response;
        ssize_t result;

        TRACE_IN(on_mp_write_session_response_write1);
        c_mp_ws_response = get_cache_mp_write_session_response(
                &qstate->response);
        result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
                sizeof(int));
        if (result != sizeof(int)) {
                LOG_ERR_3("on_mp_write_session_response_write1",
                        "write failed");
                TRACE_OUT(on_mp_write_session_response_write1);
                return (-1);
        }

        if (c_mp_ws_response->error_code == 0) {
                qstate->kevent_watermark = sizeof(int);
                qstate->process_func = on_mp_write_session_mapper;
                qstate->kevent_filter = EVFILT_READ;
        } else {
                qstate->kevent_watermark = 0;
                qstate->process_func = NULL;
        }
        TRACE_OUT(on_mp_write_session_response_write1);
        return (0);
}

/*
 * Mapper function is used to avoid multiple connections for each session
 * write or read requests. After processing the request, it does not close
 * the connection, but waits for the next request.
 */
static int
on_mp_write_session_mapper(struct query_state *qstate)
{
        ssize_t result;
        int             elem_type;

        TRACE_IN(on_mp_write_session_mapper);
        if (qstate->kevent_watermark == 0) {
                qstate->kevent_watermark = sizeof(int);
        } else {
                result = qstate->read_func(qstate, &elem_type, sizeof(int));
                if (result != sizeof(int)) {
                        LOG_ERR_3("on_mp_write_session_mapper",
                                "read failed");
                        TRACE_OUT(on_mp_write_session_mapper);
                        return (-1);
                }

                switch (elem_type) {
                case CET_MP_WRITE_SESSION_WRITE_REQUEST:
                        qstate->kevent_watermark = sizeof(size_t);
                        qstate->process_func =
                                on_mp_write_session_write_request_read1;
                        break;
                case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
                        qstate->kevent_watermark = 0;
                        qstate->process_func =
                                on_mp_write_session_abandon_notification;
                        break;
                case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
                        qstate->kevent_watermark = 0;
                        qstate->process_func =
                                on_mp_write_session_close_notification;
                        break;
                default:
                        qstate->kevent_watermark = 0;
                        qstate->process_func = NULL;
                        LOG_ERR_2("on_mp_write_session_mapper",
                                "unknown element type");
                        TRACE_OUT(on_mp_write_session_mapper);
                        return (-1);
                }
        }
        TRACE_OUT(on_mp_write_session_mapper);
        return (0);
}

/*
 * The functions below are used to process multipart write sessions write
 * requests.
 * - on_mp_write_session_write_request_read1 and
 *   on_mp_write_session_write_request_read2 read the request itself
 * - on_mp_write_session_write_request_process processes it
 * - on_mp_write_session_write_response_write1 sends the response
 */
static int
on_mp_write_session_write_request_read1(struct query_state *qstate)
{
        struct cache_mp_write_session_write_request     *write_request;
        ssize_t result;

        TRACE_IN(on_mp_write_session_write_request_read1);
        init_comm_element(&qstate->request,
                CET_MP_WRITE_SESSION_WRITE_REQUEST);
        write_request = get_cache_mp_write_session_write_request(
                &qstate->request);

        result = qstate->read_func(qstate, &write_request->data_size,
                sizeof(size_t));

        if (result != sizeof(size_t)) {
                LOG_ERR_3("on_mp_write_session_write_request_read1",
                        "read failed");
                TRACE_OUT(on_mp_write_session_write_request_read1);
                return (-1);
        }

        if (BUFSIZE_INVALID(write_request->data_size)) {
                LOG_ERR_3("on_mp_write_session_write_request_read1",
                        "invalid data_size value");
                TRACE_OUT(on_mp_write_session_write_request_read1);
                return (-1);
        }

        write_request->data = calloc(1, write_request->data_size);
        assert(write_request->data != NULL);

        qstate->kevent_watermark = write_request->data_size;
        qstate->process_func = on_mp_write_session_write_request_read2;
        TRACE_OUT(on_mp_write_session_write_request_read1);
        return (0);
}

static int
on_mp_write_session_write_request_read2(struct query_state *qstate)
{
        struct cache_mp_write_session_write_request     *write_request;
        ssize_t result;

        TRACE_IN(on_mp_write_session_write_request_read2);
        write_request = get_cache_mp_write_session_write_request(
                &qstate->request);

        result = qstate->read_func(qstate, write_request->data,
                write_request->data_size);

        if (result < 0 || (size_t)result != qstate->kevent_watermark) {
                LOG_ERR_3("on_mp_write_session_write_request_read2",
                        "read failed");
                TRACE_OUT(on_mp_write_session_write_request_read2);
                return (-1);
        }

        qstate->kevent_watermark = 0;
        qstate->process_func = on_mp_write_session_write_request_process;
        TRACE_OUT(on_mp_write_session_write_request_read2);
        return (0);
}

static int
on_mp_write_session_write_request_process(struct query_state *qstate)
{
        struct cache_mp_write_session_write_request     *write_request;
        struct cache_mp_write_session_write_response    *write_response;

        TRACE_IN(on_mp_write_session_write_request_process);
        init_comm_element(&qstate->response,
                CET_MP_WRITE_SESSION_WRITE_RESPONSE);
        write_response = get_cache_mp_write_session_write_response(
                &qstate->response);
        write_request = get_cache_mp_write_session_write_request(
                &qstate->request);

        configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
        write_response->error_code = cache_mp_write(
                (cache_mp_write_session)qstate->mdata,
                write_request->data,
                write_request->data_size);
        configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);

        qstate->kevent_watermark = sizeof(int);
        qstate->process_func = on_mp_write_session_write_response_write1;
        qstate->kevent_filter = EVFILT_WRITE;

        TRACE_OUT(on_mp_write_session_write_request_process);
        return (0);
}

static int
on_mp_write_session_write_response_write1(struct query_state *qstate)
{
        struct cache_mp_write_session_write_response    *write_response;
        ssize_t result;

        TRACE_IN(on_mp_write_session_write_response_write1);
        write_response = get_cache_mp_write_session_write_response(
                &qstate->response);
        result = qstate->write_func(qstate, &write_response->error_code,
                sizeof(int));
        if (result != sizeof(int)) {
                LOG_ERR_3("on_mp_write_session_write_response_write1",
                        "write failed");
                TRACE_OUT(on_mp_write_session_write_response_write1);
                return (-1);
        }

        if (write_response->error_code == 0) {
                finalize_comm_element(&qstate->request);
                finalize_comm_element(&qstate->response);

                qstate->kevent_watermark = sizeof(int);
                qstate->process_func = on_mp_write_session_mapper;
                qstate->kevent_filter = EVFILT_READ;
        } else {
                qstate->kevent_watermark = 0;
                qstate->process_func = 0;
        }

        TRACE_OUT(on_mp_write_session_write_response_write1);
        return (0);
}

/*
 * Handles abandon notifications. Destroys the session by calling the
 * abandon_cache_mp_write_session.
 */
static int
on_mp_write_session_abandon_notification(struct query_state *qstate)
{
        TRACE_IN(on_mp_write_session_abandon_notification);
        configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
        abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
        configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
        qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;

        qstate->kevent_watermark = 0;
        qstate->process_func = NULL;
        TRACE_OUT(on_mp_write_session_abandon_notification);
        return (0);
}

/*
 * Handles close notifications. Commits the session by calling
 * the close_cache_mp_write_session.
 */
static int
on_mp_write_session_close_notification(struct query_state *qstate)
{
        TRACE_IN(on_mp_write_session_close_notification);
        configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
        close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
        configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
        qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;

        qstate->kevent_watermark = 0;
        qstate->process_func = NULL;
        TRACE_OUT(on_mp_write_session_close_notification);
        return (0);
}

cache_entry register_new_mp_cache_entry(struct query_state *qstate,
        const char *dec_cache_entry_name)
{
        cache_entry c_entry;
        char *en_bkp;

        TRACE_IN(register_new_mp_cache_entry);
        c_entry = INVALID_CACHE_ENTRY;
        configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);

        configuration_lock_wrlock(s_configuration);
        en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
        qstate->config_entry->mp_cache_params.cep.entry_name =
                (char *)dec_cache_entry_name;
        register_cache_entry(s_cache, (struct cache_entry_params *)
                &qstate->config_entry->mp_cache_params);
        qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
        configuration_unlock(s_configuration);

        configuration_lock_rdlock(s_configuration);
        c_entry = find_cache_entry(s_cache,
                dec_cache_entry_name);
        configuration_unlock(s_configuration);

        configuration_entry_add_mp_cache_entry(qstate->config_entry,
                c_entry);

        configuration_unlock_entry(qstate->config_entry,
                CELT_MULTIPART);

        TRACE_OUT(register_new_mp_cache_entry);
        return (c_entry);
}