root/usr/src/cmd/sort/merge.c
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright 1998-2003 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#include "merge.h"

/*
 * External merge sort
 *
 *   The following code implements the merge phase of sort(1) using a heap-based
 *   priority queue.  Fast paths for merging two files as well as outputting a
 *   single file are provided.
 *
 * Memory footprint management
 *
 *   The N-way fan-out of the merge phase can lead to compromising memory
 *   consumption if not constrained, so two mechanisms are used to regulate
 *   the memory footprint during the merge phase:
 *
 *   1.  Single use memory advice.  Since we proceed through each merge file in
 *       order, any line we have output is never required again--at least, not
 *       from that input file.  Accordingly, we use the SOP_RELEASE_LINE()
 *       operation to advise that the memory backing the raw data for the stream
 *       up to that line is no longer of interest.  (For certain classes of
 *       streams, this leads to an madvise(3C) call with the MADV_DONTNEED
 *       flag.)
 *
 *   2.  Number of merge files.  The number of merge files is constrained based
 *       on the amount of physical memory specified via the -S option (or deemed
 *       available based on an inquiry of sysconf(3C) for _SC_AVPHYS_PAGES).
 *       The number of merge files is calculated based on the average resident
 *       size of a stream that supports the SOP_RELEASE_LINE() operation; this
 *       number is conservative for streams that do not support this operation.
 *       A minimum of four subfiles will always be used, resource limits
 *       permitting.
 *
 * Temporary filespace footprint management
 *
 *   Once the merge sort has utilized a temporary file, it may be deleted at
 *   close, as it's not used again and preserving the files until exit may
 *   compromise sort completion when limited temporary space is available.
 */

static int pq_N;
static stream_t **pq_queue;
static int (*pq_coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);

static ssize_t (*mg_coll_convert)(field_t *, line_rec_t *, flag_t, vchar_t);

static int
prepare_output_stream(stream_t *ostrp, sort_t *S)
{
        stream_clear(ostrp);
        stream_unset(ostrp, STREAM_OPEN);

        stream_set(ostrp,
            (S->m_single_byte_locale ? STREAM_SINGLE : STREAM_WIDE) |
            (S->m_unique_lines ? STREAM_UNIQUE : 0));

        if (S->m_output_to_stdout) {
                stream_set(ostrp, STREAM_NOTFILE);
                ostrp->s_filename = (char *)filename_stdout;
        } else
                ostrp->s_filename = S->m_output_filename;

        return (SOP_OPEN_FOR_WRITE(ostrp));
}

static void
merge_one_stream(field_t *fields_chain, stream_t *strp, stream_t *outstrp,
    vchar_t field_separator)
{
        size_t element_size = strp->s_element_size;
        size_t initial_size = INITIAL_COLLATION_SIZE * element_size;

        if (strp->s_status & STREAM_SINGLE || strp->s_status & STREAM_WIDE)
                stream_set(strp, STREAM_INSTANT);

        if (SOP_PRIME(strp) == PRIME_SUCCEEDED) {
                strp->s_current.l_collate_bufsize = initial_size;
                strp->s_current.l_collate.sp = safe_realloc(NULL, initial_size);

                (void) mg_coll_convert(fields_chain, &strp->s_current,
                    FCV_REALLOC, field_separator);
                SOP_PUT_LINE(outstrp, &strp->s_current);
                SOP_RELEASE_LINE(strp);

                while (!SOP_EOS(strp)) {
                        SOP_FETCH(strp);
                        if (strp->s_current.l_collate_length == 0)
                                (void) mg_coll_convert(fields_chain,
                                    &strp->s_current, FCV_REALLOC,
                                    field_separator);
                        SOP_PUT_LINE(outstrp, &strp->s_current);
                        SOP_RELEASE_LINE(strp);
                }

                (void) SOP_CLOSE(strp);
                SOP_FLUSH(outstrp);
        }
}

static void
merge_two_streams(field_t *fields_chain, stream_t *str_a, stream_t *str_b,
    stream_t *outstrp, vchar_t field_separator, flag_t coll_flags)
{
        int (*collate_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t);
        size_t element_size = str_a->s_element_size;
        size_t initial_size = INITIAL_COLLATION_SIZE * element_size;

        ASSERT(str_a->s_element_size == str_b->s_element_size);

        if (str_a->s_element_size == sizeof (char))
                collate_fcn = collated;
        else
                collate_fcn = collated_wide;

        if (str_a->s_status & STREAM_SINGLE || str_a->s_status & STREAM_WIDE)
                stream_set(str_a, STREAM_INSTANT);
        if (str_b->s_status & STREAM_SINGLE || str_b->s_status & STREAM_WIDE)
                stream_set(str_b, STREAM_INSTANT);

        if (SOP_PRIME(str_a) != PRIME_SUCCEEDED) {
                if (SOP_PRIME(str_b) != PRIME_SUCCEEDED)
                        return;

                merge_one_stream(fields_chain, str_b, outstrp,
                    field_separator);
                return;
        }

        if (SOP_PRIME(str_b) != PRIME_SUCCEEDED) {
                merge_one_stream(fields_chain, str_a, outstrp,
                    field_separator);
                return;
        }

        str_a->s_current.l_collate_bufsize =
            str_b->s_current.l_collate_bufsize = initial_size;

        str_a->s_current.l_collate.sp = safe_realloc(NULL, initial_size);
        str_b->s_current.l_collate.sp = safe_realloc(NULL, initial_size);

        (void) mg_coll_convert(fields_chain, &str_a->s_current, FCV_REALLOC,
            field_separator);
        (void) mg_coll_convert(fields_chain, &str_b->s_current, FCV_REALLOC,
            field_separator);

        for (;;) {
                if (collate_fcn(&str_a->s_current, &str_b->s_current, 0,
                    coll_flags) < 0) {
                        SOP_PUT_LINE(outstrp, &str_a->s_current);
                        SOP_RELEASE_LINE(str_a);
                        if (SOP_EOS(str_a)) {
                                (void) SOP_CLOSE(str_a);
                                str_a = str_b;
                                break;
                        }
                        SOP_FETCH(str_a);
                        if (str_a->s_current.l_collate_length != 0)
                                continue;
                        (void) mg_coll_convert(fields_chain, &str_a->s_current,
                            FCV_REALLOC, field_separator);
                } else {
                        SOP_PUT_LINE(outstrp, &str_b->s_current);
                        SOP_RELEASE_LINE(str_b);
                        if (SOP_EOS(str_b)) {
                                SOP_CLOSE(str_b);
                                break;
                        }
                        SOP_FETCH(str_b);
                        if (str_b->s_current.l_collate_length != 0)
                                continue;
                        (void) mg_coll_convert(fields_chain, &str_b->s_current,
                            FCV_REALLOC, field_separator);
                }
        }

        SOP_PUT_LINE(outstrp, &str_a->s_current);
        SOP_RELEASE_LINE(str_a);

        while (!SOP_EOS(str_a)) {
                SOP_FETCH(str_a);
                if (str_a->s_current.l_collate_length == 0)
                        (void) mg_coll_convert(fields_chain, &str_a->s_current,
                            FCV_REALLOC, field_separator);
                SOP_PUT_LINE(outstrp, &str_a->s_current);
                SOP_RELEASE_LINE(str_a);
        }

        (void) SOP_CLOSE(str_a);
        SOP_FLUSH(outstrp);
}

/*
 * priority queue routines
 *   used for merges involving more than two sources
 */
static void
heap_up(stream_t **A, int k, flag_t coll_flags)
{
        while (k > 1 &&
            pq_coll_fcn(&A[k / 2]->s_current, &A[k]->s_current, 0,
            coll_flags) > 0) {
                swap((void **)&pq_queue[k], (void **)&pq_queue[k / 2]);
                k /= 2;
        }
}

static void
heap_down(stream_t **A, int k, int N, flag_t coll_flags)
{
        int     j;

        while (2 * k <= N) {
                j = 2 * k;
                if (j < N && pq_coll_fcn(&A[j]->s_current,
                    &A[j + 1]->s_current, 0, coll_flags) > 0)
                        j++;
                if (pq_coll_fcn(&A[k]->s_current, &A[j]->s_current, 0,
                    coll_flags) <= 0)
                        break;
                swap((void **)&pq_queue[k], (void **)&pq_queue[j]);
                k = j;
        }
}

static int
pqueue_empty()
{
        return (pq_N == 0);
}

static void
pqueue_init(size_t max_size,
    int (*coll_fcn)(line_rec_t *, line_rec_t *, ssize_t, flag_t))
{
        pq_queue = safe_realloc(NULL, sizeof (stream_t *) * (max_size + 1));
        pq_N = 0;
        pq_coll_fcn = coll_fcn;
}

static void
pqueue_insert(stream_t *source, flag_t coll_flags)
{
        pq_queue[++pq_N] = source;
        heap_up(pq_queue, pq_N, coll_flags);
}

static stream_t *
pqueue_head(flag_t coll_flags)
{
        swap((void **)&pq_queue[1], (void **)&pq_queue[pq_N]);
        heap_down(pq_queue, 1, pq_N - 1, coll_flags);
        return (pq_queue[pq_N--]);
}

static void
merge_n_streams(sort_t *S, stream_t *head_streamp, int n_streams,
    stream_t *out_streamp, flag_t coll_flags)
{
        stream_t *top_streamp;
        stream_t *cur_streamp;
        stream_t *bot_streamp;
        stream_t *loop_out_streamp;
        flag_t is_single_byte = S->m_single_byte_locale;

        int n_opens = 0;
        int threshold_opens;

        threshold_opens = MAX(4,
            2 * S->m_memory_available / DEFAULT_RELEASE_SIZE);

        pqueue_init(n_streams, is_single_byte ? collated : collated_wide);

        top_streamp = bot_streamp = head_streamp;

        for (;;) {
                hold_file_descriptor();
                while (bot_streamp != NULL) {

                        if (n_opens > threshold_opens ||
                            stream_open_for_read(S, bot_streamp) == -1) {
                                /*
                                 * Available file descriptors would exceed
                                 * memory target or have been exhausted; back
                                 * off to the last valid, primed stream.
                                 */
                                bot_streamp = bot_streamp->s_previous;
                                break;
                        }

                        if (bot_streamp->s_status & STREAM_SINGLE ||
                            bot_streamp->s_status & STREAM_WIDE)
                                stream_set(bot_streamp, STREAM_INSTANT);

                        bot_streamp = bot_streamp->s_next;
                        n_opens++;
                }
                release_file_descriptor();

                if (bot_streamp == NULL) {
                        if (prepare_output_stream(out_streamp, S) != -1)
                                loop_out_streamp = out_streamp;
                        else
                                die(EMSG_DESCRIPTORS);
                } else {
                        loop_out_streamp = stream_push_to_temporary(
                            &head_streamp, NULL, ST_OPEN | ST_NOCACHE |
                            (is_single_byte ? 0 : ST_WIDE));

                        if (loop_out_streamp == NULL ||
                            top_streamp == bot_streamp)
                                /*
                                 * We need three file descriptors to make
                                 * progress; if top_streamp == bot_streamp, then
                                 * we have only two.
                                 */
                                die(EMSG_DESCRIPTORS);
                }

                for (cur_streamp = top_streamp; cur_streamp != bot_streamp;
                    cur_streamp = cur_streamp->s_next) {
                        /*
                         * Empty stream?
                         */
                        if (!(cur_streamp->s_status & STREAM_ARRAY) &&
                            SOP_EOS(cur_streamp)) {
                                stream_unlink_temporary(cur_streamp);
                                continue;
                        }

                        /*
                         * Given that stream is not empty, any error in priming
                         * must be fatal.
                         */
                        if (SOP_PRIME(cur_streamp) != PRIME_SUCCEEDED)
                                die(EMSG_BADPRIME);

                        cur_streamp->s_current.l_collate_bufsize =
                            INITIAL_COLLATION_SIZE;
                        cur_streamp->s_current.l_collate.sp =
                            safe_realloc(NULL, INITIAL_COLLATION_SIZE);
                        (void) mg_coll_convert(S->m_fields_head,
                            &cur_streamp->s_current, FCV_REALLOC,
                            S->m_field_separator);

                        pqueue_insert(cur_streamp, coll_flags);
                }

                while (!pqueue_empty()) {
                        cur_streamp = pqueue_head(coll_flags);

                        SOP_PUT_LINE(loop_out_streamp, &cur_streamp->s_current);
                        SOP_RELEASE_LINE(cur_streamp);

                        if (!SOP_EOS(cur_streamp)) {
                                SOP_FETCH(cur_streamp);
                                (void) mg_coll_convert(S->m_fields_head,
                                    &cur_streamp->s_current, FCV_REALLOC,
                                    S->m_field_separator);
                                pqueue_insert(cur_streamp, coll_flags);
                        }
                }

                cur_streamp = top_streamp;
                while (cur_streamp != bot_streamp) {
                        if (!(cur_streamp->s_status & STREAM_ARRAY))
                                safe_free(cur_streamp->s_current.l_collate.sp);
                        cur_streamp->s_current.l_collate.sp = NULL;

                        (void) SOP_FREE(cur_streamp);
                        stream_unlink_temporary(cur_streamp);
                        (void) SOP_CLOSE(cur_streamp);

                        cur_streamp = cur_streamp->s_next;
                }

                (void) SOP_FLUSH(loop_out_streamp);

                if (bot_streamp == NULL)
                        break;

                if (!(loop_out_streamp->s_status & STREAM_NOTFILE)) {
                        (void) SOP_CLOSE(loop_out_streamp);
                        /*
                         * Get file size so that we may treat intermediate files
                         * with our stream_mmap facilities.
                         */
                        stream_stat_chain(loop_out_streamp);
                        __S(stats_incr_merge_files());
                }

                n_opens = 0;

                top_streamp = bot_streamp;
                bot_streamp = bot_streamp->s_next;
        }
}

void
merge(sort_t *S)
{
        stream_t *merge_chain;
        stream_t *cur_streamp;
        stream_t out_stream;
        uint_t n_merges;
        flag_t coll_flags;

        if (S->m_merge_only) {
                merge_chain = S->m_input_streams;
                set_cleanup_chain(&S->m_input_streams);
        } else {
                /*
                 * Otherwise we're inheriting the temporary output files from
                 * our internal sort.
                 */
                merge_chain = S->m_temporary_streams;
                stream_stat_chain(merge_chain);
                __S(stats_set_merge_files(stream_count_chain(merge_chain)));
        }

        if (S->m_field_options & FIELD_REVERSE_COMPARISONS)
                coll_flags = COLL_REVERSE;
        else
                coll_flags = 0;
        if (S->m_entire_line)
                coll_flags |= COLL_UNIQUE;

        n_merges = stream_count_chain(merge_chain);

        mg_coll_convert = S->m_coll_convert;
        cur_streamp = merge_chain;

        switch (n_merges) {
                case 0:
                        /*
                         * No files for merge.
                         */
                        warn(gettext("no files available to merge\n"));
                        break;
                case 1:
                        /*
                         * Fast path: only one file for merge.
                         */
                        (void) stream_open_for_read(S, cur_streamp);
                        (void) prepare_output_stream(&out_stream, S);
                        merge_one_stream(S->m_fields_head, cur_streamp,
                            &out_stream, S->m_field_separator);
                        break;
                case 2:
                        /*
                         * Fast path: only two files for merge.
                         */
                        (void) stream_open_for_read(S, cur_streamp);
                        (void) stream_open_for_read(S, cur_streamp->s_next);
                        if (prepare_output_stream(&out_stream, S) == -1)
                                die(EMSG_DESCRIPTORS);
                        merge_two_streams(S->m_fields_head, cur_streamp,
                            cur_streamp->s_next, &out_stream,
                            S->m_field_separator, coll_flags);
                        break;
                default:
                        /*
                         * Full merge.
                         */
                        merge_n_streams(S, cur_streamp, n_merges, &out_stream,
                            coll_flags);
                        break;
        }

        remove_output_guard();
}