root/usr/src/uts/common/rpc/xdr_rdma.c
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
 */

/*
 * Copyright (c) 2007, The Ohio State University. All rights reserved.
 *
 * Portions of this source code is developed by the team members of
 * The Ohio State University's Network-Based Computing Laboratory (NBCL),
 * headed by Professor Dhabaleswar K. (DK) Panda.
 *
 * Acknowledgements to contributions from developors:
 *   Ranjit Noronha: noronha@cse.ohio-state.edu
 *   Lei Chai      : chail@cse.ohio-state.edu
 *   Weikuan Yu    : yuw@cse.ohio-state.edu
 *
 */

/*
 * xdr_rdma.c, XDR implementation using RDMA to move large chunks
 */

#include <sys/param.h>
#include <sys/types.h>
#include <sys/systm.h>
#include <sys/kmem.h>
#include <sys/sdt.h>
#include <sys/debug.h>

#include <rpc/types.h>
#include <rpc/xdr.h>
#include <sys/cmn_err.h>
#include <rpc/rpc_sztypes.h>
#include <rpc/rpc_rdma.h>
#include <sys/sysmacros.h>

/*
 * RCP header and xdr encoding overhead.  The number was determined by
 * tracing the msglen in svc_rdma_ksend for sec=sys,krb5,krb5i and krb5p.
 * If the XDR_RDMA_BUF_OVERHEAD is not large enough the result is the trigger
 * of the dtrace probe on the server "krpc-e-svcrdma-ksend-noreplycl" from
 * svc_rdma_ksend.
 */
#define XDR_RDMA_BUF_OVERHEAD   300

static bool_t   xdrrdma_getint32(XDR *, int32_t *);
static bool_t   xdrrdma_putint32(XDR *, int32_t *);
static bool_t   xdrrdma_getbytes(XDR *, caddr_t, int);
static bool_t   xdrrdma_putbytes(XDR *, caddr_t, int);
uint_t          xdrrdma_getpos(XDR *);
bool_t          xdrrdma_setpos(XDR *, uint_t);
static rpc_inline_t *xdrrdma_inline(XDR *, int);
void            xdrrdma_destroy(XDR *);
static bool_t   xdrrdma_control(XDR *, int, void *);
static bool_t  xdrrdma_read_a_chunk(XDR *, CONN **);
static void xdrrdma_free_xdr_chunks(CONN *, struct clist *);

struct xdr_ops  xdrrdmablk_ops = {
        xdrrdma_getbytes,
        xdrrdma_putbytes,
        xdrrdma_getpos,
        xdrrdma_setpos,
        xdrrdma_inline,
        xdrrdma_destroy,
        xdrrdma_control,
        xdrrdma_getint32,
        xdrrdma_putint32
};

struct xdr_ops  xdrrdma_ops = {
        xdrrdma_getbytes,
        xdrrdma_putbytes,
        xdrrdma_getpos,
        xdrrdma_setpos,
        xdrrdma_inline,
        xdrrdma_destroy,
        xdrrdma_control,
        xdrrdma_getint32,
        xdrrdma_putint32
};

/*
 * A chunk list entry identifies a chunk of opaque data to be moved
 * separately from the rest of the RPC message. xp_min_chunk = 0, is a
 * special case for ENCODING, which means do not chunk the incoming stream of
 * data.
 *
 * A read chunk can contain part of the RPC message in addition to the
 * inline message. In such a case, (xp_offp - x_base) will not provide
 * the correct xdr offset of the entire message. xp_off is used in such
 * a case to denote the offset or current position in the overall message
 * covering both the inline and the chunk. This is used only in the case
 * of decoding and useful to compare read chunk 'c_xdroff' offsets.
 *
 * An example for a read chunk containing an XDR message:
 * An NFSv4 compound as following:
 *
 * PUTFH
 * WRITE [4109 bytes]
 * GETATTR
 *
 * Solaris Encoding is:
 * -------------------
 *
 * <Inline message>: [PUTFH WRITE4args GETATTR]
 *                                   |
 *                                   v
 * [RDMA_READ chunks]:               [write data]
 *
 *
 * Linux encoding is:
 * -----------------
 *
 * <Inline message>: [PUTFH WRITE4args]
 *                                    |
 *                                    v
 * [RDMA_READ chunks]:                [Write data] [Write data2] [Getattr chunk]
 *                                     chunk1       chunk2         chunk3
 *
 * where the READ chunks are as:
 *
 *             - chunk1 - 4k
 * write data |
 *             - chunk2 - 13 bytes(4109 - 4k)
 * getattr op  - chunk3 - 19 bytes
 * (getattr op starts at byte 4 after 3 bytes of roundup)
 *
 */

typedef struct {
        caddr_t         xp_offp;
        int             xp_min_chunk;
        uint_t          xp_flags;       /* Controls setting for rdma xdr */
        int             xp_buf_size;    /* size of xdr buffer */
        int             xp_off;         /* overall offset */
        struct clist    *xp_rcl;        /* head of chunk list */
        struct clist    **xp_rcl_next;  /* location to place/find next chunk */
        struct clist    *xp_rcl_xdr;    /* copy of rcl containing RPC message */
        struct clist    *xp_wcl;        /* head of write chunk list */
        CONN            *xp_conn;       /* connection for chunk data xfer */
        uint_t          xp_reply_chunk_len;
        /* used to track length for security modes: integrity/privacy */
        uint_t          xp_reply_chunk_len_alt;
} xrdma_private_t;

extern kmem_cache_t *clist_cache;

bool_t
xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep,
    CONN **conn, const uint_t maxsize)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        struct clist    *cle = *(xdrp->xp_rcl_next);
        struct clist    *rdclist = NULL, *prev = NULL;
        bool_t          retval = TRUE;
        uint32_t        cur_offset = 0;
        uint32_t        total_segments = 0;
        uint32_t        actual_segments = 0;
        uint32_t        alen;
        uint_t          total_len;

        ASSERT(xdrs->x_op != XDR_FREE);

        /*
         * first deal with the length since xdr bytes are counted
         */
        if (!xdr_u_int(xdrs, sizep)) {
                DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail);
                return (FALSE);
        }
        total_len = *sizep;
        if (total_len > maxsize) {
                DTRACE_PROBE2(xdr__e__getrdmablk_bad_size,
                    int, total_len, int, maxsize);
                return (FALSE);
        }
        (*conn) = xdrp->xp_conn;

        /*
         * if no data we are done
         */
        if (total_len == 0)
                return (TRUE);

        while (cle) {
                total_segments++;
                cle = cle->c_next;
        }

        cle = *(xdrp->xp_rcl_next);

        /*
         * If there was a chunk at the current offset, then setup a read
         * chunk list which records the destination address and length
         * and will RDMA READ the data in later.
         */
        if (cle == NULL)
                return (FALSE);

        if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base))
                return (FALSE);

        /*
         * Setup the chunk list with appropriate
         * address (offset) and length
         */
        for (actual_segments = 0;
            actual_segments < total_segments; actual_segments++) {

                DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len,
                    uint32_t, total_len, uint32_t, cle->c_xdroff);

                if (total_len <= 0)
                        break;

                /*
                 * not the first time in the loop
                 */
                if (actual_segments > 0)
                        cle = cle->c_next;

                cle->u.c_daddr = (uint64) cur_offset;
                alen = 0;
                if (cle->c_len > total_len) {
                        alen = cle->c_len;
                        cle->c_len = total_len;
                }
                if (!alen)
                        xdrp->xp_rcl_next = &cle->c_next;

                cur_offset += cle->c_len;
                total_len -= cle->c_len;

                if ((total_segments - actual_segments - 1) == 0 &&
                    total_len > 0) {
                        DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort);
                        retval = FALSE;
                }

                if ((total_segments - actual_segments - 1) > 0 &&
                    total_len == 0) {
                        DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig,
                            int, total_segments, int, actual_segments);
                }

                rdclist = clist_alloc();
                (*rdclist) = (*cle);
                if ((*rlist) == NULL)
                        (*rlist) = rdclist;
                if (prev == NULL)
                        prev = rdclist;
                else {
                        prev->c_next = rdclist;
                        prev = rdclist;
                }

        }

        if (prev != NULL)
                prev->c_next = NULL;

        /*
         * Adjust the chunk length, if we read only a part of
         * a chunk.
         */

        if (alen) {
                cle->w.c_saddr =
                    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
                cle->c_len = alen - cle->c_len;
        }

        return (retval);
}

/*
 * The procedure xdrrdma_create initializes a stream descriptor for a memory
 * buffer.
 */
void
xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size,
    int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn)
{
        xrdma_private_t *xdrp;
        struct clist   *cle;

        xdrs->x_op = op;
        xdrs->x_ops = &xdrrdma_ops;
        xdrs->x_base = addr;
        xdrs->x_handy = size;
        xdrs->x_public = NULL;

        xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t),
            KM_SLEEP);
        xdrs->x_private = (caddr_t)xdrp;
        xdrp->xp_offp = addr;
        xdrp->xp_min_chunk = min_chunk;
        xdrp->xp_flags = 0;
        xdrp->xp_buf_size = size;
        xdrp->xp_rcl = cl;
        xdrp->xp_reply_chunk_len = 0;
        xdrp->xp_reply_chunk_len_alt = 0;

        if (op == XDR_ENCODE && cl != NULL) {
                /* Find last element in chunk list and set xp_rcl_next */
                for (cle = cl; cle->c_next != NULL; cle = cle->c_next)
                        continue;

                xdrp->xp_rcl_next = &(cle->c_next);
        } else {
                xdrp->xp_rcl_next = &(xdrp->xp_rcl);
        }

        xdrp->xp_wcl = NULL;

        xdrp->xp_conn = conn;
        if (xdrp->xp_min_chunk != 0)
                xdrp->xp_flags |= XDR_RDMA_CHUNK;
}

/* ARGSUSED */
void
xdrrdma_destroy(XDR * xdrs)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

        if (xdrp == NULL)
                return;

        if (xdrp->xp_wcl) {
                if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
                        (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl);
                        rdma_buf_free(xdrp->xp_conn,
                            &xdrp->xp_wcl->rb_longbuf);
                }
                clist_free(xdrp->xp_wcl);
        }

        if (xdrp->xp_rcl) {
                if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
                        (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl);
                        rdma_buf_free(xdrp->xp_conn,
                            &xdrp->xp_rcl->rb_longbuf);
                }
                clist_free(xdrp->xp_rcl);
        }

        if (xdrp->xp_rcl_xdr)
                xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr);

        (void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
        xdrs->x_private = NULL;
}

static  bool_t
xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        int chunked = 0;

        if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) {
                /*
                 * check if rest of the rpc message is in a chunk
                 */
                if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) {
                        return (FALSE);
                }
                chunked = 1;
        }

        /* LINTED pointer alignment */
        *int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));

        DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p);

        xdrp->xp_offp += sizeof (int32_t);

        if (chunked)
                xdrs->x_handy -= (int)sizeof (int32_t);

        if (xdrp->xp_off != 0) {
                xdrp->xp_off += sizeof (int32_t);
        }

        return (TRUE);
}

static  bool_t
xdrrdma_putint32(XDR *xdrs, int32_t *int32p)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

        if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
                return (FALSE);

        /* LINTED pointer alignment */
        *(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p));
        xdrp->xp_offp += sizeof (int32_t);

        return (TRUE);
}

/*
 * DECODE bytes from XDR stream for rdma.
 * If the XDR stream contains a read chunk list,
 * it will go through xdrrdma_getrdmablk instead.
 */
static  bool_t
xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        struct clist    *cle = *(xdrp->xp_rcl_next);
        struct clist    *cls = *(xdrp->xp_rcl_next);
        struct clist    cl;
        bool_t          retval = TRUE;
        uint32_t        total_len = len;
        uint32_t        cur_offset = 0;
        uint32_t        total_segments = 0;
        uint32_t        actual_segments = 0;
        uint32_t        status = RDMA_SUCCESS;
        uint32_t        alen = 0;
        uint32_t        xpoff;

        while (cle) {
                total_segments++;
                cle = cle->c_next;
        }

        cle = *(xdrp->xp_rcl_next);

        if (xdrp->xp_off) {
                xpoff = xdrp->xp_off;
        } else {
                xpoff = (xdrp->xp_offp - xdrs->x_base);
        }

        /*
         * If there was a chunk at the current offset, then setup a read
         * chunk list which records the destination address and length
         * and will RDMA READ the data in later.
         */

        if (cle != NULL && cle->c_xdroff == xpoff) {
                for (actual_segments = 0;
                    actual_segments < total_segments; actual_segments++) {

                        if (total_len <= 0)
                                break;

                        if (status != RDMA_SUCCESS)
                                goto out;

                        cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset;
                        alen = 0;
                        if (cle->c_len > total_len) {
                                alen = cle->c_len;
                                cle->c_len = total_len;
                        }
                        if (!alen)
                                xdrp->xp_rcl_next = &cle->c_next;

                        cur_offset += cle->c_len;
                        total_len -= cle->c_len;

                        if ((total_segments - actual_segments - 1) == 0 &&
                            total_len > 0) {
                                DTRACE_PROBE(
                                    krpc__e__xdrrdma_getbytes_chunktooshort);
                                retval = FALSE;
                        }

                        if ((total_segments - actual_segments - 1) > 0 &&
                            total_len == 0) {
                                DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig,
                                    int, total_segments, int, actual_segments);
                        }

                        /*
                         * RDMA READ the chunk data from the remote end.
                         * First prep the destination buffer by registering
                         * it, then RDMA READ the chunk data. Since we are
                         * doing streaming memory, sync the destination
                         * buffer to CPU and deregister the buffer.
                         */
                        if (xdrp->xp_conn == NULL) {
                                return (FALSE);
                        }
                        cl = *cle;
                        cl.c_next = NULL;
                        status = clist_register(xdrp->xp_conn, &cl,
                            CLIST_REG_DST);
                        if (status != RDMA_SUCCESS) {
                                retval = FALSE;
                                /*
                                 * Deregister the previous chunks
                                 * before return
                                 */
                                goto out;
                        }

                        cle->c_dmemhandle = cl.c_dmemhandle;
                        cle->c_dsynchandle = cl.c_dsynchandle;

                        /*
                         * Now read the chunk in
                         */
                        if ((total_segments - actual_segments - 1) == 0 ||
                            total_len == 0) {
                                status = RDMA_READ(xdrp->xp_conn, &cl, WAIT);
                        } else {
                                status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT);
                        }
                        if (status != RDMA_SUCCESS) {
                                DTRACE_PROBE1(
                                    krpc__i__xdrrdma_getblk_readfailed,
                                    int, status);
                                retval = FALSE;
                        }

                        cle = cle->c_next;

                }

                /*
                 * sync the memory for cpu
                 */
                cl = *cls;
                cl.c_next = NULL;
                cl.c_len = cur_offset;
                if (clist_syncmem(
                    xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
                        retval = FALSE;
                }
out:

                /*
                 * Deregister the chunks
                 */
                cle = cls;
                while (actual_segments != 0) {
                        cl = *cle;
                        cl.c_next = NULL;

                        cl.c_regtype = CLIST_REG_DST;
                        (void) clist_deregister(xdrp->xp_conn, &cl);

                        cle = cle->c_next;
                        actual_segments--;
                }

                if (alen) {
                        cle = *(xdrp->xp_rcl_next);
                        cle->w.c_saddr =
                            (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
                        cle->c_len = alen - cle->c_len;
                }

                return (retval);
        }

        if ((xdrs->x_handy -= len) < 0)
                return (FALSE);

        bcopy(xdrp->xp_offp, addr, len);

        xdrp->xp_offp += len;

        if (xdrp->xp_off != 0)
                xdrp->xp_off += len;

        return (TRUE);
}

/*
 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of
 * bytes contain no chunks to seperate out, and if the bytes do not fit in
 * the supplied buffer, grow the buffer and free the old buffer.
 */
static  bool_t
xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        /*
         * Is this stream accepting chunks?
         * If so, does the either of the two following conditions exist?
         * - length of bytes to encode is greater than the min chunk size?
         * - remaining space in this stream is shorter than length of
         *   bytes to encode?
         *
         * If the above exists, then create a chunk for this encoding
         * and save the addresses, etc.
         */
        if (xdrp->xp_flags & XDR_RDMA_CHUNK &&
            ((xdrp->xp_min_chunk != 0 &&
            len >= xdrp->xp_min_chunk) ||
            (xdrs->x_handy - len  < 0))) {
                struct clist    *cle;
                int             offset = xdrp->xp_offp - xdrs->x_base;

                cle = clist_alloc();
                cle->c_xdroff = offset;
                cle->c_len = len;
                cle->w.c_saddr = (uint64)(uintptr_t)addr;
                cle->c_next = NULL;

                *(xdrp->xp_rcl_next) = cle;
                xdrp->xp_rcl_next = &(cle->c_next);

                return (TRUE);
        }
        /* Is there enough space to encode what is left? */
        if ((xdrs->x_handy -= len) < 0) {
                return (FALSE);
        }
        bcopy(addr, xdrp->xp_offp, len);
        xdrp->xp_offp += len;

        return (TRUE);
}

uint_t
xdrrdma_getpos(XDR *xdrs)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

        return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base));
}

bool_t
xdrrdma_setpos(XDR *xdrs, uint_t pos)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

        caddr_t         newaddr = xdrs->x_base + pos;
        caddr_t         lastaddr = xdrp->xp_offp + xdrs->x_handy;
        ptrdiff_t       diff;

        if (newaddr > lastaddr)
                return (FALSE);

        xdrp->xp_offp = newaddr;
        diff = lastaddr - newaddr;
        xdrs->x_handy = (int)diff;

        return (TRUE);
}

/* ARGSUSED */
static rpc_inline_t *
xdrrdma_inline(XDR *xdrs, int len)
{
        rpc_inline_t    *buf = NULL;
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        struct clist    *cle = *(xdrp->xp_rcl_next);

        if (xdrs->x_op == XDR_DECODE) {
                /*
                 * Since chunks aren't in-line, check to see whether there is
                 * a chunk in the inline range.
                 */
                if (cle != NULL &&
                    cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len))
                        return (NULL);
        }

        /* LINTED pointer alignment */
        buf = (rpc_inline_t *)xdrp->xp_offp;
        if (!IS_P2ALIGNED(buf, sizeof (int32_t)))
                return (NULL);

        if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 &&
            len >= xdrp->xp_min_chunk)) {
                return (NULL);
        } else {
                xdrs->x_handy -= len;
                xdrp->xp_offp += len;
                return (buf);
        }
}

static  bool_t
xdrrdma_control(XDR *xdrs, int request, void *info)
{
        int32_t         *int32p;
        int             len, i;
        uint_t          in_flags;
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        rdma_chunkinfo_t *rcip = NULL;
        rdma_wlist_conn_info_t *rwcip = NULL;
        rdma_chunkinfo_lengths_t *rcilp = NULL;
        struct uio *uiop;
        struct clist    *rwl = NULL, *first = NULL;
        struct clist    *prev = NULL;

        switch (request) {
        case XDR_PEEK:
                /*
                 * Return the next 4 byte unit in the XDR stream.
                 */
                if (xdrs->x_handy < sizeof (int32_t))
                        return (FALSE);

                int32p = (int32_t *)info;
                *int32p = (int32_t)ntohl((uint32_t)
                    (*((int32_t *)(xdrp->xp_offp))));

                return (TRUE);

        case XDR_SKIPBYTES:
                /*
                 * Skip the next N bytes in the XDR stream.
                 */
                int32p = (int32_t *)info;
                len = RNDUP((int)(*int32p));
                if ((xdrs->x_handy -= len) < 0)
                        return (FALSE);
                xdrp->xp_offp += len;

                return (TRUE);

        case XDR_RDMA_SET_FLAGS:
                /*
                 * Set the flags provided in the *info in xp_flags for rdma
                 * xdr stream control.
                 */
                int32p = (int32_t *)info;
                in_flags = (uint_t)(*int32p);

                xdrp->xp_flags |= in_flags;
                return (TRUE);

        case XDR_RDMA_GET_FLAGS:
                /*
                 * Get the flags provided in xp_flags return through *info
                 */
                int32p = (int32_t *)info;

                *int32p = (int32_t)xdrp->xp_flags;
                return (TRUE);

        case XDR_RDMA_GET_CHUNK_LEN:
                rcilp = (rdma_chunkinfo_lengths_t *)info;
                rcilp->rcil_len = xdrp->xp_reply_chunk_len;
                rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt;

                return (TRUE);

        case XDR_RDMA_ADD_CHUNK:
                /*
                 * Store wlist information
                 */

                rcip = (rdma_chunkinfo_t *)info;

                DTRACE_PROBE2(krpc__i__xdrrdma__control__add__chunk,
                    rci_type_t, rcip->rci_type, uint32, rcip->rci_len);
                switch (rcip->rci_type) {
                case RCI_WRITE_UIO_CHUNK:
                        xdrp->xp_reply_chunk_len_alt += rcip->rci_len;

                        if ((rcip->rci_len + XDR_RDMA_BUF_OVERHEAD) <
                            xdrp->xp_min_chunk) {
                                xdrp->xp_wcl = NULL;
                                *(rcip->rci_clpp) = NULL;
                                return (TRUE);
                        }
                        uiop = rcip->rci_a.rci_uiop;

                        for (i = 0; i < uiop->uio_iovcnt; i++) {
                                rwl = clist_alloc();
                                if (first == NULL)
                                        first = rwl;
                                rwl->c_len = uiop->uio_iov[i].iov_len;
                                rwl->u.c_daddr =
                                    (uint64)(uintptr_t)
                                    (uiop->uio_iov[i].iov_base);
                                /*
                                 * if userspace address, put adspace ptr in
                                 * clist. If not, then do nothing since it's
                                 * already set to NULL (from kmem_zalloc)
                                 */
                                if (uiop->uio_segflg == UIO_USERSPACE) {
                                        rwl->c_adspc = ttoproc(curthread)->p_as;
                                }

                                if (prev == NULL)
                                        prev = rwl;
                                else {
                                        prev->c_next = rwl;
                                        prev = rwl;
                                }
                        }

                        rwl->c_next = NULL;
                        xdrp->xp_wcl = first;
                        *(rcip->rci_clpp) = first;

                        break;

                case RCI_WRITE_ADDR_CHUNK:
                        rwl = clist_alloc();

                        rwl->c_len = rcip->rci_len;
                        rwl->u.c_daddr3 = rcip->rci_a.rci_addr;
                        rwl->c_next = NULL;
                        xdrp->xp_reply_chunk_len_alt += rcip->rci_len;

                        xdrp->xp_wcl = rwl;
                        *(rcip->rci_clpp) = rwl;

                        break;

                case RCI_REPLY_CHUNK:
                        xdrp->xp_reply_chunk_len += rcip->rci_len;
                        break;
                }
                return (TRUE);

        case XDR_RDMA_GET_WLIST:
                *((struct clist **)info) = xdrp->xp_wcl;
                return (TRUE);

        case XDR_RDMA_SET_WLIST:
                xdrp->xp_wcl = (struct clist *)info;
                return (TRUE);

        case XDR_RDMA_GET_RLIST:
                *((struct clist **)info) = xdrp->xp_rcl;
                return (TRUE);

        case XDR_RDMA_GET_WCINFO:
                rwcip = (rdma_wlist_conn_info_t *)info;

                rwcip->rwci_wlist = xdrp->xp_wcl;
                rwcip->rwci_conn = xdrp->xp_conn;

                return (TRUE);

        default:
                return (FALSE);
        }
}

bool_t xdr_do_clist(XDR *, clist **);

/*
 * Not all fields in struct clist are interesting to the RPC over RDMA
 * protocol. Only XDR the interesting fields.
 */
bool_t
xdr_clist(XDR *xdrs, clist *objp)
{
        if (!xdr_uint32(xdrs, &objp->c_xdroff))
                return (FALSE);
        if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr))
                return (FALSE);
        if (!xdr_uint32(xdrs, &objp->c_len))
                return (FALSE);
        if (!xdr_uint64(xdrs, &objp->w.c_saddr))
                return (FALSE);
        if (!xdr_do_clist(xdrs, &objp->c_next))
                return (FALSE);
        return (TRUE);
}

/*
 * The following two functions are forms of xdr_pointer()
 * and xdr_reference(). Since the generic versions just
 * kmem_alloc() a new clist, we actually want to use the
 * rdma_clist kmem_cache.
 */

/*
 * Generate or free a clist structure from the
 * kmem_cache "rdma_clist"
 */
bool_t
xdr_ref_clist(XDR *xdrs, caddr_t *pp)
{
        caddr_t loc = *pp;
        bool_t stat;

        if (loc == NULL) {
                switch (xdrs->x_op) {
                case XDR_FREE:
                        return (TRUE);

                case XDR_DECODE:
                        *pp = loc = (caddr_t)clist_alloc();
                        break;

                case XDR_ENCODE:
                        ASSERT(loc);
                        break;
                }
        }

        stat = xdr_clist(xdrs, (struct clist *)loc);

        if (xdrs->x_op == XDR_FREE) {
                kmem_cache_free(clist_cache, loc);
                *pp = NULL;
        }
        return (stat);
}

/*
 * XDR a pointer to a possibly recursive clist. This differs
 * with xdr_reference in that it can serialize/deserialiaze
 * trees correctly.
 *
 *  What is sent is actually a union:
 *
 *  union object_pointer switch (boolean b) {
 *  case TRUE: object_data data;
 *  case FALSE: void nothing;
 *  }
 *
 * > objpp: Pointer to the pointer to the object.
 *
 */

bool_t
xdr_do_clist(XDR *xdrs, clist **objpp)
{
        bool_t more_data;

        more_data = (*objpp != NULL);
        if (!xdr_bool(xdrs, &more_data))
                return (FALSE);
        if (!more_data) {
                *objpp = NULL;
                return (TRUE);
        }
        return (xdr_ref_clist(xdrs, (caddr_t *)objpp));
}

uint_t
xdr_getbufsize(XDR *xdrs)
{
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);

        return ((uint_t)xdrp->xp_buf_size);
}

/* ARGSUSED */
bool_t
xdr_encode_rlist_svc(XDR *xdrs, clist *rlist)
{
        bool_t  vfalse = FALSE;

        ASSERT(rlist == NULL);
        return (xdr_bool(xdrs, &vfalse));
}

bool_t
xdr_encode_wlist(XDR *xdrs, clist *w)
{
        bool_t          vfalse = FALSE, vtrue = TRUE;
        int             i;
        uint_t          num_segment = 0;
        struct clist    *cl;

        /* does a wlist exist? */
        if (w == NULL) {
                return (xdr_bool(xdrs, &vfalse));
        }
        /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */
        if (!xdr_bool(xdrs, &vtrue))
                return (FALSE);

        for (cl = w; cl != NULL; cl = cl->c_next) {
                num_segment++;
        }

        if (!xdr_uint32(xdrs, &num_segment))
                return (FALSE);
        for (i = 0; i < num_segment; i++) {

                DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len);

                if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
                        return (FALSE);

                if (!xdr_uint32(xdrs, &w->c_len))
                        return (FALSE);

                if (!xdr_uint64(xdrs, &w->u.c_daddr))
                        return (FALSE);

                w = w->c_next;
        }

        if (!xdr_bool(xdrs, &vfalse))
                return (FALSE);

        return (TRUE);
}


/*
 * Conditionally decode a RDMA WRITE chunk list from XDR stream.
 *
 * If the next boolean in the XDR stream is false there is no
 * RDMA WRITE chunk list present. Otherwise iterate over the
 * array and for each entry: allocate a struct clist and decode.
 * Pass back an indication via wlist_exists if we have seen a
 * RDMA WRITE chunk list.
 */
bool_t
xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists)
{
        struct clist    *tmp;
        bool_t          more = FALSE;
        uint32_t        seg_array_len;
        uint32_t        i;

        if (!xdr_bool(xdrs, &more))
                return (FALSE);

        /* is there a wlist? */
        if (more == FALSE) {
                *wlist_exists = FALSE;
                return (TRUE);
        }
        *wlist_exists = TRUE;

        if (!xdr_uint32(xdrs, &seg_array_len))
                return (FALSE);

        tmp = *w = clist_alloc();
        for (i = 0; i < seg_array_len; i++) {

                if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
                        return (FALSE);
                if (!xdr_uint32(xdrs, &tmp->c_len))
                        return (FALSE);

                DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len,
                    uint_t, tmp->c_len);

                if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
                        return (FALSE);
                if (i < seg_array_len - 1) {
                        tmp->c_next = clist_alloc();
                        tmp = tmp->c_next;
                } else {
                        tmp->c_next = NULL;
                }
        }

        more = FALSE;
        if (!xdr_bool(xdrs, &more))
                return (FALSE);

        return (TRUE);
}

/*
 * Server side RDMA WRITE list decode.
 * XDR context is memory ops
 */
bool_t
xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl,
    uint32_t *total_length, CONN *conn)
{
        struct clist    *first, *ncl;
        char            *memp;
        uint32_t        num_wclist;
        uint32_t        wcl_length = 0;
        uint32_t        i;
        bool_t          more = FALSE;

        *wclp = NULL;
        *wwl = FALSE;
        *total_length = 0;

        if (!xdr_bool(xdrs, &more)) {
                return (FALSE);
        }

        if (more == FALSE) {
                return (TRUE);
        }

        *wwl = TRUE;

        if (!xdr_uint32(xdrs, &num_wclist)) {
                DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength);
                return (FALSE);
        }

        first = ncl = clist_alloc();

        for (i = 0; i < num_wclist; i++) {

                if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
                        goto err_out;
                if (!xdr_uint32(xdrs, &ncl->c_len))
                        goto err_out;
                if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
                        goto err_out;

                if (ncl->c_len > MAX_SVC_XFER_SIZE) {
                        DTRACE_PROBE(
                            krpc__e__xdrrdma__wlistsvc__chunklist_toobig);
                        ncl->c_len = MAX_SVC_XFER_SIZE;
                }

                DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len,
                    uint_t, ncl->c_len);

                wcl_length += ncl->c_len;

                if (i < num_wclist - 1) {
                        ncl->c_next = clist_alloc();
                        ncl = ncl->c_next;
                }
        }

        if (!xdr_bool(xdrs, &more))
                goto err_out;

        first->rb_longbuf.type = RDMA_LONG_BUFFER;
        first->rb_longbuf.len =
            wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN;

        if (rdma_buf_alloc(conn, &first->rb_longbuf)) {
                clist_free(first);
                return (FALSE);
        }

        memp = first->rb_longbuf.addr;

        ncl = first;
        for (i = 0; i < num_wclist; i++) {
                ncl->w.c_saddr3 = (caddr_t)memp;
                memp += ncl->c_len;
                ncl = ncl->c_next;
        }

        *wclp = first;
        *total_length = wcl_length;
        return (TRUE);

err_out:
        clist_free(first);
        return (FALSE);
}

/*
 * XDR decode the long reply write chunk.
 */
bool_t
xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist)
{
        bool_t          have_rchunk = FALSE;
        struct clist    *first = NULL, *ncl = NULL;
        uint32_t        num_wclist;
        uint32_t        i;

        if (!xdr_bool(xdrs, &have_rchunk))
                return (FALSE);

        if (have_rchunk == FALSE)
                return (TRUE);

        if (!xdr_uint32(xdrs, &num_wclist)) {
                DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength);
                return (FALSE);
        }

        if (num_wclist == 0) {
                return (FALSE);
        }

        first = ncl = clist_alloc();

        for (i = 0; i < num_wclist; i++) {

                if (i > 0) {
                        ncl->c_next = clist_alloc();
                        ncl = ncl->c_next;
                }

                if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
                        goto err_out;
                if (!xdr_uint32(xdrs, &ncl->c_len))
                        goto err_out;
                if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
                        goto err_out;

                if (ncl->c_len > MAX_SVC_XFER_SIZE) {
                        DTRACE_PROBE(
                            krpc__e__xdrrdma__replywchunk__chunklist_toobig);
                        ncl->c_len = MAX_SVC_XFER_SIZE;
                }
                if (!(ncl->c_dmemhandle.mrc_rmr &&
                    (ncl->c_len > 0) && ncl->u.c_daddr))
                        DTRACE_PROBE(
                            krpc__e__xdrrdma__replywchunk__invalid_segaddr);

                DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len,
                    uint32_t, ncl->c_len);

        }
        *clist = first;
        return (TRUE);

err_out:
        clist_free(first);
        return (FALSE);
}


bool_t
xdr_encode_reply_wchunk(XDR *xdrs,
    struct clist *cl_longreply, uint32_t seg_array_len)
{
        int             i;
        bool_t          long_reply_exists = TRUE;
        uint32_t        length;
        uint64          offset;

        if (seg_array_len > 0) {
                if (!xdr_bool(xdrs, &long_reply_exists))
                        return (FALSE);
                if (!xdr_uint32(xdrs, &seg_array_len))
                        return (FALSE);

                for (i = 0; i < seg_array_len; i++) {
                        if (!cl_longreply)
                                return (FALSE);
                        length = cl_longreply->c_len;
                        offset = (uint64) cl_longreply->u.c_daddr;

                        DTRACE_PROBE1(
                            krpc__i__xdr_encode_reply_wchunk_c_len,
                            uint32_t, length);

                        if (!xdr_uint32(xdrs,
                            &cl_longreply->c_dmemhandle.mrc_rmr))
                                return (FALSE);
                        if (!xdr_uint32(xdrs, &length))
                                return (FALSE);
                        if (!xdr_uint64(xdrs, &offset))
                                return (FALSE);
                        cl_longreply = cl_longreply->c_next;
                }
        } else {
                long_reply_exists = FALSE;
                if (!xdr_bool(xdrs, &long_reply_exists))
                        return (FALSE);
        }
        return (TRUE);
}
bool_t
xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count)
{
        struct clist    *rdclist;
        struct clist    cl;
        uint_t          total_len = 0;
        uint32_t        status;
        bool_t          retval = TRUE;

        rlist->rb_longbuf.type = RDMA_LONG_BUFFER;
        rlist->rb_longbuf.len =
            count > RCL_BUF_LEN ? count : RCL_BUF_LEN;

        if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) {
                return (FALSE);
        }

        /*
         * The entire buffer is registered with the first chunk.
         * Later chunks will use the same registered memory handle.
         */

        cl = *rlist;
        cl.c_next = NULL;
        if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
                rdma_buf_free(*conn, &rlist->rb_longbuf);
                DTRACE_PROBE(
                    krpc__e__xdrrdma__readfromclient__clist__reg);
                return (FALSE);
        }

        rlist->c_regtype = CLIST_REG_DST;
        rlist->c_dmemhandle = cl.c_dmemhandle;
        rlist->c_dsynchandle = cl.c_dsynchandle;

        for (rdclist = rlist;
            rdclist != NULL; rdclist = rdclist->c_next) {
                total_len += rdclist->c_len;
#if (defined(OBJ32)||defined(DEBUG32))
                rdclist->u.c_daddr3 =
                    (caddr_t)((char *)rlist->rb_longbuf.addr +
                    (uint32) rdclist->u.c_daddr3);
#else
                rdclist->u.c_daddr3 =
                    (caddr_t)((char *)rlist->rb_longbuf.addr +
                    (uint64) rdclist->u.c_daddr);

#endif
                cl = (*rdclist);
                cl.c_next = NULL;

                /*
                 * Use the same memory handle for all the chunks
                 */
                cl.c_dmemhandle = rlist->c_dmemhandle;
                cl.c_dsynchandle = rlist->c_dsynchandle;


                DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
                    int, rdclist->c_len);

                /*
                 * Now read the chunk in
                 */
                if (rdclist->c_next == NULL) {
                        status = RDMA_READ(*conn, &cl, WAIT);
                } else {
                        status = RDMA_READ(*conn, &cl, NOWAIT);
                }
                if (status != RDMA_SUCCESS) {
                        DTRACE_PROBE(
                            krpc__e__xdrrdma__readfromclient__readfailed);
                        rdma_buf_free(*conn, &rlist->rb_longbuf);
                        return (FALSE);
                }
        }

        cl = (*rlist);
        cl.c_next = NULL;
        cl.c_len = total_len;
        if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
                retval = FALSE;
        }
        return (retval);
}

bool_t
xdrrdma_free_clist(CONN *conn, struct clist *clp)
{
        rdma_buf_free(conn, &clp->rb_longbuf);
        clist_free(clp);
        return (TRUE);
}

bool_t
xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl)
{
        int status;
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        struct xdr_ops *xops = xdrrdma_xops();
        struct clist *tcl, *wrcl, *cl;
        struct clist fcl;
        int rndup_present, rnduplen;

        rndup_present = 0;
        wrcl = NULL;

        /* caller is doing a sizeof */
        if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
                return (TRUE);

        /* copy of the first chunk */
        fcl = *wcl;
        fcl.c_next = NULL;

        /*
         * The entire buffer is registered with the first chunk.
         * Later chunks will use the same registered memory handle.
         */

        status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
        if (status != RDMA_SUCCESS) {
                return (FALSE);
        }

        wcl->c_regtype = CLIST_REG_SOURCE;
        wcl->c_smemhandle = fcl.c_smemhandle;
        wcl->c_ssynchandle = fcl.c_ssynchandle;

        /*
         * Only transfer the read data ignoring any trailing
         * roundup chunks. A bit of work, but it saves an
         * unnecessary extra RDMA_WRITE containing only
         * roundup bytes.
         */

        rnduplen = clist_len(wcl) - data_len;

        if (rnduplen) {

                tcl = wcl->c_next;

                /*
                 * Check if there is a trailing roundup chunk
                 */
                while (tcl) {
                        if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) {
                                rndup_present = 1;
                                break;
                        }
                        tcl = tcl->c_next;
                }

                /*
                 * Make a copy chunk list skipping the last chunk
                 */
                if (rndup_present) {
                        cl = wcl;
                        tcl = NULL;
                        while (cl) {
                                if (tcl == NULL) {
                                        tcl = clist_alloc();
                                        wrcl = tcl;
                                } else {
                                        tcl->c_next = clist_alloc();
                                        tcl = tcl->c_next;
                                }

                                *tcl = *cl;
                                cl = cl->c_next;
                                /* last chunk */
                                if (cl->c_next == NULL)
                                        break;
                        }
                        tcl->c_next = NULL;
                }
        }

        if (wrcl == NULL) {
                /* No roundup chunks */
                wrcl = wcl;
        }

        /*
         * Set the registered memory handles for the
         * rest of the chunks same as the first chunk.
         */
        tcl = wrcl->c_next;
        while (tcl) {
                tcl->c_smemhandle = fcl.c_smemhandle;
                tcl->c_ssynchandle = fcl.c_ssynchandle;
                tcl = tcl->c_next;
        }

        /*
         * Sync the total len beginning from the first chunk.
         */
        fcl.c_len = clist_len(wrcl);
        status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
        if (status != RDMA_SUCCESS) {
                return (FALSE);
        }

        status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT);

        if (rndup_present)
                clist_free(wrcl);

        if (status != RDMA_SUCCESS) {
                return (FALSE);
        }

        return (TRUE);
}


/*
 * Reads one chunk at a time
 */

static bool_t
xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn)
{
        int status;
        int32_t len = 0;
        xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
        struct clist *cle = *(xdrp->xp_rcl_next);
        struct clist *rclp = xdrp->xp_rcl;
        struct clist *clp;

        /*
         * len is used later to decide xdr offset in
         * the chunk factoring any 4-byte XDR alignment
         * (See read chunk example top of this file)
         */
        while (rclp != cle) {
                len += rclp->c_len;
                rclp = rclp->c_next;
        }

        len = RNDUP(len) - len;

        ASSERT(xdrs->x_handy <= 0);

        /*
         * If this is the first chunk to contain the RPC
         * message set xp_off to the xdr offset of the
         * inline message.
         */
        if (xdrp->xp_off == 0)
                xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base);

        if (cle == NULL || (cle->c_xdroff != xdrp->xp_off))
                return (FALSE);

        /*
         * Make a copy of the chunk to read from client.
         * Chunks are read on demand, so read only one
         * for now.
         */

        rclp = clist_alloc();
        *rclp = *cle;
        rclp->c_next = NULL;

        xdrp->xp_rcl_next = &cle->c_next;

        /*
         * If there is a roundup present, then skip those
         * bytes when reading.
         */
        if (len) {
                rclp->w.c_saddr =
                    (uint64)(uintptr_t)rclp->w.c_saddr + len;
                        rclp->c_len = rclp->c_len - len;
        }

        status = xdrrdma_read_from_client(rclp, conn, rclp->c_len);

        if (status == FALSE) {
                clist_free(rclp);
                return (status);
        }

        xdrp->xp_offp = rclp->rb_longbuf.addr;
        xdrs->x_base = xdrp->xp_offp;
        xdrs->x_handy = rclp->c_len;

        /*
         * This copy of read chunks containing the XDR
         * message is freed later in xdrrdma_destroy()
         */

        if (xdrp->xp_rcl_xdr) {
                /* Add the chunk to end of the list */
                clp = xdrp->xp_rcl_xdr;
                while (clp->c_next != NULL)
                        clp = clp->c_next;
                clp->c_next = rclp;
        } else {
                xdrp->xp_rcl_xdr = rclp;
        }
        return (TRUE);
}

static void
xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl)
{
        struct clist *cl;

        (void) clist_deregister(conn, xdr_rcl);

        /*
         * Read chunks containing parts XDR message are
         * special: in case of multiple chunks each has
         * its own buffer.
         */

        cl = xdr_rcl;
        while (cl) {
                rdma_buf_free(conn, &cl->rb_longbuf);
                cl = cl->c_next;
        }

        clist_free(xdr_rcl);
}