root/usr/src/cmd/sort/streams.c
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#include "streams.h"

static const stream_ops_t invalid_ops = {
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL
};

stream_t *
stream_new(int src)
{
        stream_t *str = safe_realloc(NULL, sizeof (stream_t));

        stream_clear(str);
        stream_set(str, src);

        return (str);
}

void
stream_set(stream_t *str, flag_t flags)
{
        if (flags & STREAM_SOURCE_MASK) {
                ASSERT((flags & STREAM_SOURCE_MASK) == STREAM_ARRAY ||
                    (flags & STREAM_SOURCE_MASK) == STREAM_SINGLE ||
                    (flags & STREAM_SOURCE_MASK) == STREAM_MMAP ||
                    (flags & STREAM_SOURCE_MASK) == STREAM_WIDE);

                str->s_status &= ~STREAM_SOURCE_MASK;
                str->s_status |= flags & STREAM_SOURCE_MASK;

                switch (flags & STREAM_SOURCE_MASK) {
                        case STREAM_NO_SOURCE:
                                str->s_element_size = 0;
                                str->s_ops = invalid_ops;
                                return;
                        case STREAM_ARRAY:
                                /*
                                 * Array streams inherit element size.
                                 */
                                str->s_ops = stream_array_ops;
                                break;
                        case STREAM_MMAP:
                                str->s_element_size = sizeof (char);
                                str->s_ops = stream_mmap_ops;
                                break;
                        case STREAM_SINGLE:
                                str->s_element_size = sizeof (char);
                                str->s_ops = stream_stdio_ops;
                                break;
                        case STREAM_WIDE:
                                str->s_element_size = sizeof (wchar_t);
                                str->s_ops = stream_wide_ops;
                                break;
                        default:
                                die(EMSG_UNKN_STREAM, str->s_status);
                }
        }

        str->s_status |= (flags & ~STREAM_SOURCE_MASK);

        if (str->s_status & STREAM_UNIQUE)
                switch (str->s_status & STREAM_SOURCE_MASK) {
                        case STREAM_SINGLE :
                                str->s_ops.sop_put_line =
                                    stream_stdio_put_line_unique;
                                break;
                        case STREAM_WIDE :
                                str->s_ops.sop_put_line =
                                    stream_wide_put_line_unique;
                                break;
                        default :
                                break;
                }

        if (str->s_status & STREAM_INSTANT)
                switch (str->s_status & STREAM_SOURCE_MASK) {
                        case STREAM_SINGLE :
                                str->s_ops.sop_fetch =
                                    stream_stdio_fetch_overwrite;
                                break;
                        case STREAM_WIDE :
                                str->s_ops.sop_fetch =
                                    stream_wide_fetch_overwrite;
                                break;
                        default :
                                break;
                }
}

void
stream_unset(stream_t *streamp, flag_t flags)
{
        ASSERT(!(flags & STREAM_SOURCE_MASK));

        streamp->s_status &= ~(flags & ~STREAM_SOURCE_MASK);
}

int
stream_is_primed(stream_t *streamp)
{
        return (streamp->s_status & STREAM_PRIMED);
}

void
stream_clear(stream_t *str)
{
        (void) memset(str, 0, sizeof (stream_t));
}

static void
stream_copy(stream_t *dest, stream_t *src)
{
        (void) memcpy(dest, src, sizeof (stream_t));
}

void
stream_stat_chain(stream_t *strp)
{
        struct stat buf;
        stream_t *cur_strp = strp;

        while (cur_strp != NULL) {
                if (cur_strp->s_status & STREAM_NOTFILE ||
                    cur_strp->s_status & STREAM_ARRAY) {
                        cur_strp = cur_strp->s_next;
                        continue;
                }

                if (stat(cur_strp->s_filename, &buf) < 0)
                        die(EMSG_STAT, cur_strp->s_filename);

                cur_strp->s_dev = buf.st_dev;
                cur_strp->s_ino = buf.st_ino;
                cur_strp->s_filesize = buf.st_size;

                cur_strp = cur_strp->s_next;
        }
}

uint_t
stream_count_chain(stream_t *str)
{
        uint_t n = 0;

        while (str != NULL) {
                n++;
                str = str->s_next;
        }

        return (n);
}

int
stream_open_for_read(sort_t *S, stream_t *str)
{
        int fd;

        ASSERT(!(str->s_status & STREAM_OUTPUT));

        /*
         * STREAM_ARRAY streams are open by definition.
         */
        if ((str->s_status & STREAM_SOURCE_MASK) == STREAM_ARRAY) {
                stream_set(str, STREAM_ARRAY | STREAM_OPEN);
                return (1);
        }

        /*
         * Set data type according to locale for input from stdin.
         */
        if (str->s_status & STREAM_NOTFILE) {
                str->s_type.BF.s_fp = stdin;
                stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ?
                    STREAM_SINGLE : STREAM_WIDE));
                return (1);
        }

        ASSERT(str->s_filename);

#ifndef DEBUG_DISALLOW_MMAP
        if (S->m_single_byte_locale &&
            str->s_filesize > 0 &&
            str->s_filesize < SSIZE_MAX) {
                /*
                 * make mmap() attempt; set s_status and return if successful
                 */
                fd = open(str->s_filename, O_RDONLY);
                if (fd < 0) {
                        if (errno == EMFILE || errno == ENFILE)
                                return (-1);
                        else
                                die(EMSG_OPEN, str->s_filename);
                }
                str->s_buffer = mmap(0, str->s_filesize, PROT_READ,
                    MAP_SHARED, fd, 0);

                if (str->s_buffer != MAP_FAILED) {
                        str->s_buffer_size = str->s_filesize;
                        str->s_type.SF.s_fd = fd;

                        stream_set(str, STREAM_MMAP | STREAM_OPEN);
                        stream_unset(str, STREAM_PRIMED);
                        return (1);
                }

                /*
                 * Otherwise the mmap() failed due to address space exhaustion;
                 * since we have already opened the file, we close it and drop
                 * into the normal (STDIO) case.
                 */
                (void) close(fd);
                str->s_buffer = NULL;
        }
#endif /* DEBUG_DISALLOW_MMAP */

        if ((str->s_type.BF.s_fp = fopen(str->s_filename, "r")) == NULL) {
                if (errno == EMFILE || errno == ENFILE)
                        return (-1);
                else
                        die(EMSG_OPEN, str->s_filename);
        }

        str->s_type.BF.s_vbuf = safe_realloc(NULL, STDIO_VBUF_SIZE);
        if (setvbuf(str->s_type.BF.s_fp, str->s_type.BF.s_vbuf, _IOFBF,
            STDIO_VBUF_SIZE) != 0) {
                safe_free(str->s_type.BF.s_vbuf);
                str->s_type.BF.s_vbuf = NULL;
        }

        stream_set(str, STREAM_OPEN | (S->m_single_byte_locale ? STREAM_SINGLE :
            STREAM_WIDE));
        stream_unset(str, STREAM_PRIMED);

        return (1);
}

void
stream_set_size(stream_t *str, size_t new_size)
{
        /*
         * p_new_size is new_size rounded upwards to nearest multiple of
         * PAGESIZE, since mmap() is going to reserve it in any case.  This
         * ensures that the far end of the buffer is also aligned, such that we
         * obtain aligned pointers if we choose to subtract from it.
         */
        size_t p_new_size = (new_size + PAGESIZE) & ~(PAGESIZE - 1);

        if (str->s_buffer_size == p_new_size)
                return;

        if (str->s_buffer != NULL)
                (void) munmap(str->s_buffer, str->s_buffer_size);

        if (new_size == 0) {
                str->s_buffer = NULL;
                str->s_buffer_size = 0;
                return;
        }

        str->s_buffer = xzmap(0, p_new_size, PROT_READ | PROT_WRITE,
            MAP_PRIVATE, 0);

        if (str->s_buffer == MAP_FAILED)
                die(EMSG_MMAP);

        str->s_buffer_size = p_new_size;
}

void
stream_add_file_to_chain(stream_t **str_chain, char *filename)
{
        stream_t *str;

        str = stream_new(STREAM_NO_SOURCE);

        str->s_filename = filename;
        str->s_type.SF.s_fd = -1;

        stream_push_to_chain(str_chain, str);
}

void
stream_push_to_chain(stream_t **str_chain, stream_t *streamp)
{
        stream_t *cur_streamp = *str_chain;

        if (cur_streamp == NULL) {
                *str_chain = streamp;
                streamp->s_next = NULL;
                return;
        }

        while (cur_streamp->s_next != NULL)
                cur_streamp = cur_streamp->s_next;

        cur_streamp->s_next = streamp;
        streamp->s_previous = cur_streamp;
        streamp->s_next = NULL;
}

static void
stream_dump(stream_t *str_in, stream_t *str_out)
{
        ASSERT(!(str_in->s_status & STREAM_OUTPUT));
        ASSERT(str_out->s_status & STREAM_OUTPUT);

        SOP_PUT_LINE(str_out, &str_in->s_current);

        while (!SOP_EOS(str_in)) {
                SOP_FETCH(str_in);
                SOP_PUT_LINE(str_out, &str_in->s_current);
        }
}

/*
 * stream_push_to_temporary() with flags set to ST_CACHE merely copies the
 * stream_t pointer onto the chain.  With flags set to ST_NOCACHE, the stream is
 * written out to a file.  Stream pointers passed to stream_push_to_temporary()
 * must refer to allocated objects, and not to objects created on function
 * stacks.  Finally, if strp == NULL, stream_push_to_temporary() creates and
 * pushes the new stream; the output stream is left open if ST_OPEN is set.
 */
stream_t *
stream_push_to_temporary(stream_t **str_chain, stream_t *streamp, int flags)
{
        stream_t *out_streamp;

        if (flags & ST_CACHE) {
                ASSERT(streamp->s_status & STREAM_ARRAY);
                stream_set(streamp, STREAM_NOT_FREEABLE | STREAM_TEMPORARY);
                stream_push_to_chain(str_chain, streamp);
                return (streamp);
        }

        out_streamp = safe_realloc(NULL, sizeof (stream_t));

        if (streamp != NULL) {
                stream_copy(out_streamp, streamp);
                stream_unset(out_streamp, STREAM_OPEN);
                ASSERT(streamp->s_element_size == sizeof (char) ||
                    streamp->s_element_size == sizeof (wchar_t));
                stream_set(out_streamp,
                    streamp->s_element_size == 1 ? STREAM_SINGLE : STREAM_WIDE);
                out_streamp->s_buffer = NULL;
                out_streamp->s_buffer_size = 0;
        } else {
                stream_clear(out_streamp);
                stream_set(out_streamp, flags & ST_WIDE ? STREAM_WIDE :
                    STREAM_SINGLE);
        }

        (void) bump_file_template();
        out_streamp->s_filename = strdup(get_file_template());

        if (SOP_OPEN_FOR_WRITE(out_streamp) == -1)
                return (NULL);

        stream_set(out_streamp, STREAM_TEMPORARY);
        stream_push_to_chain(str_chain, out_streamp);

        if (streamp != NULL) {
                /*
                 * We reset the input stream to the beginning, and copy it in
                 * sequence to the output stream, freeing the raw_collate field
                 * as we go.
                 */
                if (SOP_PRIME(streamp) != PRIME_SUCCEEDED)
                        die(EMSG_BADPRIME);
                stream_dump(streamp, out_streamp);
        }

        if (!(flags & ST_OPEN)) {
                SOP_FREE(out_streamp);
                (void) SOP_CLOSE(out_streamp);
        }

        /*
         * Now that we've written this stream to disk, we needn't protect any
         * in-memory consumer.
         */
        if (streamp != NULL)
                streamp->s_consumer = NULL;

        return (out_streamp);
}

void
stream_close_all_previous(stream_t *tail_streamp)
{
        stream_t *cur_streamp;

        ASSERT(tail_streamp != NULL);

        cur_streamp = tail_streamp->s_previous;
        while (cur_streamp != NULL) {
                (void) SOP_FREE(cur_streamp);
                if (SOP_IS_CLOSABLE(cur_streamp))
                        (void) SOP_CLOSE(cur_streamp);

                cur_streamp = cur_streamp->s_previous;
        }
}

void
stream_unlink_temporary(stream_t *streamp)
{
        if (streamp->s_status & STREAM_TEMPORARY) {
                (void) SOP_FREE(streamp);

                if (streamp->s_ops.sop_unlink)
                        (void) SOP_UNLINK(streamp);
        }
}

/*
 * stream_insert() takes input from src stream, converts to each line to
 * collatable form, and places a line_rec_t in dest stream, which is of type
 * STREAM_ARRAY.
 */
int
stream_insert(sort_t *S, stream_t *src, stream_t *dest)
{
        ssize_t i = dest->s_type.LA.s_array_size;
        line_rec_t *l_series;
        char *l_convert = dest->s_buffer;
        int return_val = ST_MEM_AVAIL;
        int fetch_result = NEXT_LINE_COMPLETE;

        /*
         * Scan through until total bytes allowed accumulated, and return.
         * Use SOP_FETCH(src) so that this works for all stream types,
         * and so that we can repeat until eos.
         *
         * For each new line, we move back sizeof (line_rec_t) from the end of
         * the array buffer, and copy into the start of the array buffer.  When
         * the pointers meet, or when we exhaust the current stream, we return.
         * If we have not filled the current memory allocation, we return
         * ST_MEM_AVAIL, else we return ST_MEM_FILLED.
         */
        ASSERT(stream_is_primed(src));
        ASSERT(dest->s_status & STREAM_ARRAY);

        /*LINTED ALIGNMENT*/
        l_series = (line_rec_t *)((caddr_t)dest->s_buffer
            + dest->s_buffer_size) - dest->s_type.LA.s_array_size;

        if (dest->s_type.LA.s_array_size)
                l_convert = l_series->l_collate.sp +
                    l_series->l_collate_length + src->s_element_size;

        /*
         * current line has been set prior to entry
         */
        src->s_current.l_collate.sp = l_convert;
        src->s_current.l_collate_bufsize = (caddr_t)l_series
            - (caddr_t)l_convert - sizeof (line_rec_t);
        src->s_current.l_raw_collate.sp = NULL;

        if (src->s_current.l_collate_bufsize <= 0)
                return (ST_MEM_FILLED);

        src->s_consumer = dest;

        while (src->s_current.l_collate_bufsize > 0 &&
            (src->s_current.l_collate_length = S->m_coll_convert(
            S->m_fields_head, &src->s_current, FCV_FAIL,
            S->m_field_separator)) >= 0) {
                ASSERT((char *)l_series > l_convert);
                l_series--;
                l_convert += src->s_current.l_collate_length;

                if ((char *)l_series <= l_convert) {
                        __S(stats_incr_insert_filled_downward());
                        l_series++;
                        return_val = ST_MEM_FILLED;
                        break;
                }

                /*
                 * There's no collision with the lower part of the buffer, so we
                 * can safely begin processing the line.  In the debug case, we
                 * test for uninitialized data by copying a non-zero pattern.
                 */
#ifdef DEBUG
                memset(l_series, 0x1ff11ff1, sizeof (line_rec_t));
#endif

                copy_line_rec(&src->s_current, l_series);
                i++;

                if (SOP_EOS(src) ||
                    (fetch_result = SOP_FETCH(src)) == NEXT_LINE_INCOMPLETE)
                        break;

                src->s_current.l_collate.sp = l_convert;
                src->s_current.l_collate_bufsize = (caddr_t)l_series
                    - (caddr_t)l_convert - sizeof (line_rec_t);
                src->s_current.l_raw_collate.sp = NULL;
        }

        if (fetch_result == NEXT_LINE_INCOMPLETE) {
                __S(stats_incr_insert_filled_input());
                return_val = ST_MEM_FILLED;
        } else if (src->s_current.l_collate_length < 0 ||
            src->s_current.l_collate_bufsize <= 0) {
                __S(stats_incr_insert_filled_upward());
                return_val = ST_MEM_FILLED;
        }

        if (fetch_result != NEXT_LINE_INCOMPLETE &&
            src->s_current.l_collate_length < 0 &&
            i == 0)
                /*
                 * There's no room for conversion of our only line; need to
                 * execute with larger memory.
                 */
                die(EMSG_MEMORY);

        /*
         * Set up pointer array to line records.
         */
        if (i > dest->s_type.LA.s_array_size)
                dest->s_type.LA.s_array = safe_realloc(dest->s_type.LA.s_array,
                    sizeof (line_rec_t *) * i);
        dest->s_type.LA.s_array_size = i;

        i = 0;
        while (i < dest->s_type.LA.s_array_size) {
                dest->s_type.LA.s_array[i] = l_series;
                l_series++;
                i++;
        }

        /*
         * LINES_ARRAY streams are always open.
         */
        stream_set(dest, STREAM_OPEN);

        return (return_val);
}

/*
 * stream_swap_buffer() exchanges the stream's buffer with the proffered one;
 * s_current is not adjusted so this is safe only for STREAM_INSTANT.
 */
void
stream_swap_buffer(stream_t *str, char **buf, size_t *size)
{
        void *tb = *buf;
        size_t ts = *size;

        *buf = str->s_buffer;
        *size = str->s_buffer_size;

        str->s_buffer = tb;
        str->s_buffer_size = ts;
}