root/drivers/md/dm-vdo/indexer/io-factory.c
// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */

#include "io-factory.h"

#include <linux/atomic.h>
#include <linux/blkdev.h>
#include <linux/err.h>
#include <linux/mount.h>

#include "logger.h"
#include "memory-alloc.h"
#include "numeric.h"

/*
 * The I/O factory object manages access to index storage, which is a contiguous range of blocks on
 * a block device.
 *
 * The factory holds the open device and is responsible for closing it. The factory has methods to
 * make helper structures that can be used to access sections of the index.
 */
struct io_factory {
        struct block_device *bdev;
        atomic_t ref_count;
};

/* The buffered reader allows efficient I/O by reading page-sized segments into a buffer. */
struct buffered_reader {
        struct io_factory *factory;
        struct dm_bufio_client *client;
        struct dm_buffer *buffer;
        sector_t limit;
        sector_t block_number;
        u8 *start;
        u8 *end;
};

#define MAX_READ_AHEAD_BLOCKS 4

/*
 * The buffered writer allows efficient I/O by buffering writes and committing page-sized segments
 * to storage.
 */
struct buffered_writer {
        struct io_factory *factory;
        struct dm_bufio_client *client;
        struct dm_buffer *buffer;
        sector_t limit;
        sector_t block_number;
        u8 *start;
        u8 *end;
        int error;
};

static void uds_get_io_factory(struct io_factory *factory)
{
        atomic_inc(&factory->ref_count);
}

int uds_make_io_factory(struct block_device *bdev, struct io_factory **factory_ptr)
{
        int result;
        struct io_factory *factory;

        result = vdo_allocate(1, struct io_factory, __func__, &factory);
        if (result != VDO_SUCCESS)
                return result;

        factory->bdev = bdev;
        atomic_set_release(&factory->ref_count, 1);

        *factory_ptr = factory;
        return UDS_SUCCESS;
}

int uds_replace_storage(struct io_factory *factory, struct block_device *bdev)
{
        factory->bdev = bdev;
        return UDS_SUCCESS;
}

/* Free an I/O factory once all references have been released. */
void uds_put_io_factory(struct io_factory *factory)
{
        if (atomic_add_return(-1, &factory->ref_count) <= 0)
                vdo_free(factory);
}

size_t uds_get_writable_size(struct io_factory *factory)
{
        return bdev_nr_bytes(factory->bdev);
}

/* Create a struct dm_bufio_client for an index region starting at offset. */
int uds_make_bufio(struct io_factory *factory, off_t block_offset, size_t block_size,
                   unsigned int reserved_buffers, struct dm_bufio_client **client_ptr)
{
        struct dm_bufio_client *client;

        client = dm_bufio_client_create(factory->bdev, block_size, reserved_buffers, 0,
                                        NULL, NULL, 0);
        if (IS_ERR(client))
                return -PTR_ERR(client);

        dm_bufio_set_sector_offset(client, block_offset * SECTORS_PER_BLOCK);
        *client_ptr = client;
        return UDS_SUCCESS;
}

static void read_ahead(struct buffered_reader *reader, sector_t block_number)
{
        if (block_number < reader->limit) {
                sector_t read_ahead = min((sector_t) MAX_READ_AHEAD_BLOCKS,
                                          reader->limit - block_number);

                dm_bufio_prefetch(reader->client, block_number, read_ahead);
        }
}

void uds_free_buffered_reader(struct buffered_reader *reader)
{
        if (reader == NULL)
                return;

        if (reader->buffer != NULL)
                dm_bufio_release(reader->buffer);

        dm_bufio_client_destroy(reader->client);
        uds_put_io_factory(reader->factory);
        vdo_free(reader);
}

/* Create a buffered reader for an index region starting at offset. */
int uds_make_buffered_reader(struct io_factory *factory, off_t offset, u64 block_count,
                             struct buffered_reader **reader_ptr)
{
        int result;
        struct dm_bufio_client *client = NULL;
        struct buffered_reader *reader = NULL;

        result = uds_make_bufio(factory, offset, UDS_BLOCK_SIZE, 1, &client);
        if (result != UDS_SUCCESS)
                return result;

        result = vdo_allocate(1, struct buffered_reader, "buffered reader", &reader);
        if (result != VDO_SUCCESS) {
                dm_bufio_client_destroy(client);
                return result;
        }

        *reader = (struct buffered_reader) {
                .factory = factory,
                .client = client,
                .buffer = NULL,
                .limit = block_count,
                .block_number = 0,
                .start = NULL,
                .end = NULL,
        };

        read_ahead(reader, 0);
        uds_get_io_factory(factory);
        *reader_ptr = reader;
        return UDS_SUCCESS;
}

static int position_reader(struct buffered_reader *reader, sector_t block_number,
                           off_t offset)
{
        struct dm_buffer *buffer = NULL;
        void *data;

        if ((reader->end == NULL) || (block_number != reader->block_number)) {
                if (block_number >= reader->limit)
                        return UDS_OUT_OF_RANGE;

                if (reader->buffer != NULL)
                        dm_bufio_release(vdo_forget(reader->buffer));

                data = dm_bufio_read(reader->client, block_number, &buffer);
                if (IS_ERR(data))
                        return -PTR_ERR(data);

                reader->buffer = buffer;
                reader->start = data;
                if (block_number == reader->block_number + 1)
                        read_ahead(reader, block_number + 1);
        }

        reader->block_number = block_number;
        reader->end = reader->start + offset;
        return UDS_SUCCESS;
}

static size_t bytes_remaining_in_read_buffer(struct buffered_reader *reader)
{
        return (reader->end == NULL) ? 0 : reader->start + UDS_BLOCK_SIZE - reader->end;
}

static int reset_reader(struct buffered_reader *reader)
{
        sector_t block_number;

        if (bytes_remaining_in_read_buffer(reader) > 0)
                return UDS_SUCCESS;

        block_number = reader->block_number;
        if (reader->end != NULL)
                block_number++;

        return position_reader(reader, block_number, 0);
}

int uds_read_from_buffered_reader(struct buffered_reader *reader, u8 *data,
                                  size_t length)
{
        int result = UDS_SUCCESS;
        size_t chunk_size;

        while (length > 0) {
                result = reset_reader(reader);
                if (result != UDS_SUCCESS)
                        return result;

                chunk_size = min(length, bytes_remaining_in_read_buffer(reader));
                memcpy(data, reader->end, chunk_size);
                length -= chunk_size;
                data += chunk_size;
                reader->end += chunk_size;
        }

        return UDS_SUCCESS;
}

/*
 * Verify that the next data on the reader matches the required value. If the value matches, the
 * matching contents are consumed. If the value does not match, the reader state is unchanged.
 */
int uds_verify_buffered_data(struct buffered_reader *reader, const u8 *value,
                             size_t length)
{
        int result = UDS_SUCCESS;
        size_t chunk_size;
        sector_t start_block_number = reader->block_number;
        int start_offset = reader->end - reader->start;

        while (length > 0) {
                result = reset_reader(reader);
                if (result != UDS_SUCCESS) {
                        result = UDS_CORRUPT_DATA;
                        break;
                }

                chunk_size = min(length, bytes_remaining_in_read_buffer(reader));
                if (memcmp(value, reader->end, chunk_size) != 0) {
                        result = UDS_CORRUPT_DATA;
                        break;
                }

                length -= chunk_size;
                value += chunk_size;
                reader->end += chunk_size;
        }

        if (result != UDS_SUCCESS)
                position_reader(reader, start_block_number, start_offset);

        return result;
}

/* Create a buffered writer for an index region starting at offset. */
int uds_make_buffered_writer(struct io_factory *factory, off_t offset, u64 block_count,
                             struct buffered_writer **writer_ptr)
{
        int result;
        struct dm_bufio_client *client = NULL;
        struct buffered_writer *writer;

        result = uds_make_bufio(factory, offset, UDS_BLOCK_SIZE, 1, &client);
        if (result != UDS_SUCCESS)
                return result;

        result = vdo_allocate(1, struct buffered_writer, "buffered writer", &writer);
        if (result != VDO_SUCCESS) {
                dm_bufio_client_destroy(client);
                return result;
        }

        *writer = (struct buffered_writer) {
                .factory = factory,
                .client = client,
                .buffer = NULL,
                .limit = block_count,
                .start = NULL,
                .end = NULL,
                .block_number = 0,
                .error = UDS_SUCCESS,
        };

        uds_get_io_factory(factory);
        *writer_ptr = writer;
        return UDS_SUCCESS;
}

static size_t get_remaining_write_space(struct buffered_writer *writer)
{
        return writer->start + UDS_BLOCK_SIZE - writer->end;
}

static int __must_check prepare_next_buffer(struct buffered_writer *writer)
{
        struct dm_buffer *buffer = NULL;
        void *data;

        if (writer->block_number >= writer->limit) {
                writer->error = UDS_OUT_OF_RANGE;
                return UDS_OUT_OF_RANGE;
        }

        data = dm_bufio_new(writer->client, writer->block_number, &buffer);
        if (IS_ERR(data)) {
                writer->error = -PTR_ERR(data);
                return writer->error;
        }

        writer->buffer = buffer;
        writer->start = data;
        writer->end = data;
        return UDS_SUCCESS;
}

static int flush_previous_buffer(struct buffered_writer *writer)
{
        size_t available;

        if (writer->buffer == NULL)
                return writer->error;

        if (writer->error == UDS_SUCCESS) {
                available = get_remaining_write_space(writer);

                if (available > 0)
                        memset(writer->end, 0, available);

                dm_bufio_mark_buffer_dirty(writer->buffer);
        }

        dm_bufio_release(writer->buffer);
        writer->buffer = NULL;
        writer->start = NULL;
        writer->end = NULL;
        writer->block_number++;
        return writer->error;
}

void uds_free_buffered_writer(struct buffered_writer *writer)
{
        int result;

        if (writer == NULL)
                return;

        flush_previous_buffer(writer);
        result = -dm_bufio_write_dirty_buffers(writer->client);
        if (result != UDS_SUCCESS)
                vdo_log_warning_strerror(result, "%s: failed to sync storage", __func__);

        dm_bufio_client_destroy(writer->client);
        uds_put_io_factory(writer->factory);
        vdo_free(writer);
}

/*
 * Append data to the buffer, writing as needed. If no data is provided, zeros are written instead.
 * If a write error occurs, it is recorded and returned on every subsequent write attempt.
 */
int uds_write_to_buffered_writer(struct buffered_writer *writer, const u8 *data,
                                 size_t length)
{
        int result = writer->error;
        size_t chunk_size;

        while ((length > 0) && (result == UDS_SUCCESS)) {
                if (writer->buffer == NULL) {
                        result = prepare_next_buffer(writer);
                        continue;
                }

                chunk_size = min(length, get_remaining_write_space(writer));
                if (data == NULL) {
                        memset(writer->end, 0, chunk_size);
                } else {
                        memcpy(writer->end, data, chunk_size);
                        data += chunk_size;
                }

                length -= chunk_size;
                writer->end += chunk_size;

                if (get_remaining_write_space(writer) == 0)
                        result = uds_flush_buffered_writer(writer);
        }

        return result;
}

int uds_flush_buffered_writer(struct buffered_writer *writer)
{
        if (writer->error != UDS_SUCCESS)
                return writer->error;

        return flush_previous_buffer(writer);
}