root/net/sunrpc/backchannel_rqst.c
// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************

(c) 2007 Network Appliance, Inc.  All Rights Reserved.
(c) 2009 NetApp.  All Rights Reserved.


******************************************************************************/

#include <linux/tcp.h>
#include <linux/slab.h>
#include <linux/sunrpc/xprt.h>
#include <linux/export.h>
#include <linux/sunrpc/bc_xprt.h>

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
#define RPCDBG_FACILITY RPCDBG_TRANS
#endif

#define BC_MAX_SLOTS    64U

unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
{
        return BC_MAX_SLOTS;
}

/*
 * Helper function to nullify backchannel server pointer in transport.
 * We need to synchronize setting the pointer to NULL (done so after
 * the backchannel server is shutdown) with the usage of that pointer
 * by the backchannel request processing routines
 * xprt_complete_bc_request() and rpcrdma_bc_receive_call().
 */
void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv)
{
        spin_lock(&xprt->bc_pa_lock);
        svc_destroy(serv);
        xprt->bc_serv = NULL;
        spin_unlock(&xprt->bc_pa_lock);
}
EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc);

/*
 * Helper routines that track the number of preallocation elements
 * on the transport.
 */
static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
{
        return xprt->bc_alloc_count < xprt->bc_alloc_max;
}

/*
 * Free the preallocated rpc_rqst structure and the memory
 * buffers hanging off of it.
 */
static void xprt_free_allocation(struct rpc_rqst *req)
{
        struct xdr_buf *xbufp;

        dprintk("RPC:        free allocations for req= %p\n", req);
        WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
        xbufp = &req->rq_rcv_buf;
        free_page((unsigned long)xbufp->head[0].iov_base);
        xbufp = &req->rq_snd_buf;
        free_page((unsigned long)xbufp->head[0].iov_base);
        kfree(req);
}

static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
{
        buf->head[0].iov_len = PAGE_SIZE;
        buf->tail[0].iov_len = 0;
        buf->pages = NULL;
        buf->page_len = 0;
        buf->flags = 0;
        buf->len = 0;
        buf->buflen = PAGE_SIZE;
}

static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
{
        struct page *page;
        /* Preallocate one XDR receive buffer */
        page = alloc_page(gfp_flags);
        if (page == NULL)
                return -ENOMEM;
        xdr_buf_init(buf, page_address(page), PAGE_SIZE);
        return 0;
}

static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
{
        gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
        struct rpc_rqst *req;

        /* Pre-allocate one backchannel rpc_rqst */
        req = kzalloc_obj(*req, gfp_flags);
        if (req == NULL)
                return NULL;

        req->rq_xprt = xprt;

        /* Preallocate one XDR receive buffer */
        if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
                printk(KERN_ERR "Failed to create bc receive xbuf\n");
                goto out_free;
        }
        req->rq_rcv_buf.len = PAGE_SIZE;

        /* Preallocate one XDR send buffer */
        if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
                printk(KERN_ERR "Failed to create bc snd xbuf\n");
                goto out_free;
        }
        return req;
out_free:
        xprt_free_allocation(req);
        return NULL;
}

/*
 * Preallocate up to min_reqs structures and related buffers for use
 * by the backchannel.  This function can be called multiple times
 * when creating new sessions that use the same rpc_xprt.  The
 * preallocated buffers are added to the pool of resources used by
 * the rpc_xprt.  Any one of these resources may be used by an
 * incoming callback request.  It's up to the higher levels in the
 * stack to enforce that the maximum number of session slots is not
 * being exceeded.
 *
 * Some callback arguments can be large.  For example, a pNFS server
 * using multiple deviceids.  The list can be unbound, but the client
 * has the ability to tell the server the maximum size of the callback
 * requests.  Each deviceID is 16 bytes, so allocate one page
 * for the arguments to have enough room to receive a number of these
 * deviceIDs.  The NFS client indicates to the pNFS server that its
 * callback requests can be up to 4096 bytes in size.
 */
int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
{
        if (!xprt->ops->bc_setup)
                return 0;
        return xprt->ops->bc_setup(xprt, min_reqs);
}
EXPORT_SYMBOL_GPL(xprt_setup_backchannel);

int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
{
        struct rpc_rqst *req;
        LIST_HEAD(tmp_list);
        int i;

        dprintk("RPC:       setup backchannel transport\n");

        if (min_reqs > BC_MAX_SLOTS)
                min_reqs = BC_MAX_SLOTS;

        /*
         * We use a temporary list to keep track of the preallocated
         * buffers.  Once we're done building the list we splice it
         * into the backchannel preallocation list off of the rpc_xprt
         * struct.  This helps minimize the amount of time the list
         * lock is held on the rpc_xprt struct.  It also makes cleanup
         * easier in case of memory allocation errors.
         */
        for (i = 0; i < min_reqs; i++) {
                /* Pre-allocate one backchannel rpc_rqst */
                req = xprt_alloc_bc_req(xprt);
                if (req == NULL) {
                        printk(KERN_ERR "Failed to create bc rpc_rqst\n");
                        goto out_free;
                }

                /* Add the allocated buffer to the tmp list */
                dprintk("RPC:       adding req= %p\n", req);
                list_add(&req->rq_bc_pa_list, &tmp_list);
        }

        /*
         * Add the temporary list to the backchannel preallocation list
         */
        spin_lock(&xprt->bc_pa_lock);
        list_splice(&tmp_list, &xprt->bc_pa_list);
        xprt->bc_alloc_count += min_reqs;
        xprt->bc_alloc_max += min_reqs;
        atomic_add(min_reqs, &xprt->bc_slot_count);
        spin_unlock(&xprt->bc_pa_lock);

        dprintk("RPC:       setup backchannel transport done\n");
        return 0;

out_free:
        /*
         * Memory allocation failed, free the temporary list
         */
        while (!list_empty(&tmp_list)) {
                req = list_first_entry(&tmp_list,
                                struct rpc_rqst,
                                rq_bc_pa_list);
                list_del(&req->rq_bc_pa_list);
                xprt_free_allocation(req);
        }

        dprintk("RPC:       setup backchannel transport failed\n");
        return -ENOMEM;
}

/**
 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
 * @xprt:       the transport holding the preallocated strucures
 * @max_reqs:   the maximum number of preallocated structures to destroy
 *
 * Since these structures may have been allocated by multiple calls
 * to xprt_setup_backchannel, we only destroy up to the maximum number
 * of reqs specified by the caller.
 */
void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
{
        if (xprt->ops->bc_destroy)
                xprt->ops->bc_destroy(xprt, max_reqs);
}
EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);

void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
{
        struct rpc_rqst *req = NULL, *tmp = NULL;

        dprintk("RPC:        destroy backchannel transport\n");

        if (max_reqs == 0)
                goto out;

        spin_lock_bh(&xprt->bc_pa_lock);
        xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
        list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
                dprintk("RPC:        req=%p\n", req);
                list_del(&req->rq_bc_pa_list);
                xprt_free_allocation(req);
                xprt->bc_alloc_count--;
                atomic_dec(&xprt->bc_slot_count);
                if (--max_reqs == 0)
                        break;
        }
        spin_unlock_bh(&xprt->bc_pa_lock);

out:
        dprintk("RPC:        backchannel list empty= %s\n",
                list_empty(&xprt->bc_pa_list) ? "true" : "false");
}

static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
                struct rpc_rqst *new)
{
        struct rpc_rqst *req = NULL;

        dprintk("RPC:       allocate a backchannel request\n");
        if (list_empty(&xprt->bc_pa_list)) {
                if (!new)
                        goto not_found;
                if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
                        goto not_found;
                list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
                xprt->bc_alloc_count++;
                atomic_inc(&xprt->bc_slot_count);
        }
        req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
                                rq_bc_pa_list);
        req->rq_reply_bytes_recvd = 0;
        memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
                        sizeof(req->rq_private_buf));
        req->rq_xid = xid;
        req->rq_connect_cookie = xprt->connect_cookie;
        dprintk("RPC:       backchannel req=%p\n", req);
not_found:
        return req;
}

/*
 * Return the preallocated rpc_rqst structure and XDR buffers
 * associated with this rpc_task.
 */
void xprt_free_bc_request(struct rpc_rqst *req)
{
        struct rpc_xprt *xprt = req->rq_xprt;

        xprt->ops->bc_free_rqst(req);
}

void xprt_free_bc_rqst(struct rpc_rqst *req)
{
        struct rpc_xprt *xprt = req->rq_xprt;

        dprintk("RPC:       free backchannel req=%p\n", req);

        req->rq_connect_cookie = xprt->connect_cookie - 1;
        smp_mb__before_atomic();
        clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
        smp_mb__after_atomic();

        /*
         * Return it to the list of preallocations so that it
         * may be reused by a new callback request.
         */
        spin_lock_bh(&xprt->bc_pa_lock);
        if (xprt_need_to_requeue(xprt)) {
                xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
                xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
                req->rq_rcv_buf.len = PAGE_SIZE;
                list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
                xprt->bc_alloc_count++;
                atomic_inc(&xprt->bc_slot_count);
                req = NULL;
        }
        spin_unlock_bh(&xprt->bc_pa_lock);
        if (req != NULL) {
                /*
                 * The last remaining session was destroyed while this
                 * entry was in use.  Free the entry and don't attempt
                 * to add back to the list because there is no need to
                 * have anymore preallocated entries.
                 */
                dprintk("RPC:       Last session removed req=%p\n", req);
                xprt_free_allocation(req);
        }
        xprt_put(xprt);
}

/*
 * One or more rpc_rqst structure have been preallocated during the
 * backchannel setup.  Buffer space for the send and private XDR buffers
 * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
 * to this request.  Use xprt_free_bc_request to return it.
 *
 * We know that we're called in soft interrupt context, grab the spin_lock
 * since there is no need to grab the bottom half spin_lock.
 *
 * Return an available rpc_rqst, otherwise NULL if non are available.
 */
struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
{
        struct rpc_rqst *req, *new = NULL;

        do {
                spin_lock(&xprt->bc_pa_lock);
                list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
                        if (req->rq_connect_cookie != xprt->connect_cookie)
                                continue;
                        if (req->rq_xid == xid)
                                goto found;
                }
                req = xprt_get_bc_request(xprt, xid, new);
found:
                spin_unlock(&xprt->bc_pa_lock);
                if (new) {
                        if (req != new)
                                xprt_free_allocation(new);
                        break;
                } else if (req)
                        break;
                new = xprt_alloc_bc_req(xprt);
        } while (new);
        return req;
}

/*
 * Add callback request to callback list.  Wake a thread
 * on the first pool (usually the only pool) to handle it.
 */
void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
{
        struct rpc_xprt *xprt = req->rq_xprt;

        spin_lock(&xprt->bc_pa_lock);
        list_del(&req->rq_bc_pa_list);
        xprt->bc_alloc_count--;
        spin_unlock(&xprt->bc_pa_lock);

        req->rq_private_buf.len = copied;
        set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);

        dprintk("RPC:       add callback request to list\n");
        xprt_enqueue_bc_request(req);
}

void xprt_enqueue_bc_request(struct rpc_rqst *req)
{
        struct rpc_xprt *xprt = req->rq_xprt;
        struct svc_serv *bc_serv;

        xprt_get(xprt);
        spin_lock(&xprt->bc_pa_lock);
        bc_serv = xprt->bc_serv;
        if (bc_serv) {
                lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
                svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
        }
        spin_unlock(&xprt->bc_pa_lock);
}
EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request);