#include "dedupe.h"
#include <linux/atomic.h>
#include <linux/jiffies.h>
#include <linux/kernel.h>
#include <linux/list.h>
#include <linux/ratelimit.h>
#include <linux/spinlock.h>
#include <linux/timer.h>
#include "logger.h"
#include "memory-alloc.h"
#include "numeric.h"
#include "permassert.h"
#include "string-utils.h"
#include "indexer.h"
#include "action-manager.h"
#include "admin-state.h"
#include "completion.h"
#include "constants.h"
#include "data-vio.h"
#include "int-map.h"
#include "io-submitter.h"
#include "packer.h"
#include "physical-zone.h"
#include "slab-depot.h"
#include "statistics.h"
#include "types.h"
#include "vdo.h"
#include "wait-queue.h"
#define DEDUPE_QUERY_TIMER_IDLE 0
#define DEDUPE_QUERY_TIMER_RUNNING 1
#define DEDUPE_QUERY_TIMER_FIRED 2
enum dedupe_context_state {
DEDUPE_CONTEXT_IDLE,
DEDUPE_CONTEXT_PENDING,
DEDUPE_CONTEXT_TIMED_OUT,
DEDUPE_CONTEXT_COMPLETE,
DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
};
enum index_state {
IS_CLOSED,
IS_CHANGING,
IS_OPENED,
};
static const char *CLOSED = "closed";
static const char *CLOSING = "closing";
static const char *ERROR = "error";
static const char *OFFLINE = "offline";
static const char *ONLINE = "online";
static const char *OPENING = "opening";
static const char *SUSPENDED = "suspended";
static const char *UNKNOWN = "unknown";
#define UDS_ADVICE_VERSION 2
#define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))
enum hash_lock_state {
VDO_HASH_LOCK_INITIALIZING,
VDO_HASH_LOCK_QUERYING,
VDO_HASH_LOCK_WRITING,
VDO_HASH_LOCK_UPDATING,
VDO_HASH_LOCK_LOCKING,
VDO_HASH_LOCK_VERIFYING,
VDO_HASH_LOCK_DEDUPING,
VDO_HASH_LOCK_UNLOCKING,
VDO_HASH_LOCK_BYPASSING,
};
static const char * const LOCK_STATE_NAMES[] = {
[VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
[VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
[VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
[VDO_HASH_LOCK_LOCKING] = "LOCKING",
[VDO_HASH_LOCK_QUERYING] = "QUERYING",
[VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
[VDO_HASH_LOCK_UPDATING] = "UPDATING",
[VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
[VDO_HASH_LOCK_WRITING] = "WRITING",
};
struct hash_lock {
struct uds_record_name hash;
struct list_head pool_node;
struct list_head duplicate_vios;
data_vio_count_t reference_count;
data_vio_count_t max_references;
enum hash_lock_state state;
bool update_advice;
bool verified;
bool verify_counted;
bool registered;
struct zoned_pbn duplicate;
struct pbn_lock *duplicate_lock;
struct data_vio *agent;
struct vdo_wait_queue waiters;
};
#define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS
struct hash_zones {
struct action_manager *manager;
struct uds_parameters parameters;
struct uds_index_session *index_session;
struct ratelimit_state ratelimiter;
atomic64_t timeouts;
atomic64_t dedupe_context_busy;
spinlock_t lock;
struct vdo_completion completion;
enum index_state index_state;
enum index_state index_target;
struct admin_state state;
bool changing;
bool create_flag;
bool dedupe_flag;
bool error_flag;
u64 reported_timeouts;
zone_count_t zone_count;
struct hash_zone zones[];
};
unsigned int vdo_dedupe_index_timeout_interval = 5000;
unsigned int vdo_dedupe_index_min_timer_interval = 100;
static u64 vdo_dedupe_index_timeout_jiffies;
static u64 vdo_dedupe_index_min_timer_jiffies;
static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
{
vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
return container_of(completion, struct hash_zone, completion);
}
static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
{
vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
return container_of(completion, struct hash_zones, completion);
}
static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
{
VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
"%s called on hash zone thread", name);
}
static inline bool change_context_state(struct dedupe_context *context, int old, int new)
{
return (atomic_cmpxchg(&context->state, old, new) == old);
}
static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
{
return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
}
static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
{
memset(lock, 0, sizeof(*lock));
INIT_LIST_HEAD(&lock->pool_node);
INIT_LIST_HEAD(&lock->duplicate_vios);
vdo_waitq_init(&lock->waiters);
list_add_tail(&lock->pool_node, &zone->lock_pool);
}
struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
{
if (data_vio->hash_lock == NULL)
return NULL;
return data_vio->hash_lock->duplicate_lock;
}
static inline u64 hash_lock_key(struct hash_lock *lock)
{
return get_unaligned_le64(&lock->hash.name);
}
static const char *get_hash_lock_state_name(enum hash_lock_state state)
{
BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
}
static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
{
assert_data_vio_in_hash_zone(data_vio);
VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
"%s must be for the hash lock agent", where);
}
static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
{
VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
"hash lock must not already hold a duplicate lock");
pbn_lock->holder_count += 1;
hash_lock->duplicate_lock = pbn_lock;
}
static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
{
return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
}
static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
{
struct hash_lock *old_lock = data_vio->hash_lock;
if (old_lock != NULL) {
VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
"must have a hash zone when holding a hash lock");
VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
"must be on a hash lock list when holding a hash lock");
VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
"hash lock reference must be counted");
if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
(old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
"hash locks should only become unreferenced in a terminal state, not state %s",
get_hash_lock_state_name(old_lock->state));
}
list_del_init(&data_vio->hash_lock_entry);
old_lock->reference_count -= 1;
data_vio->hash_lock = NULL;
}
if (new_lock != NULL) {
list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_vios);
new_lock->reference_count += 1;
if (new_lock->max_references < new_lock->reference_count)
new_lock->max_references = new_lock->reference_count;
data_vio->hash_lock = new_lock;
}
}
static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
bool agent_is_done);
static void start_locking(struct hash_lock *lock, struct data_vio *agent);
static void start_writing(struct hash_lock *lock, struct data_vio *agent);
static void unlock_duplicate_pbn(struct vdo_completion *completion);
static void transfer_allocation_lock(struct data_vio *data_vio);
static void exit_hash_lock(struct data_vio *data_vio)
{
vdo_release_hash_lock(data_vio);
data_vio->vio.completion.callback = complete_data_vio;
continue_data_vio(data_vio);
}
static void set_duplicate_location(struct data_vio *data_vio,
const struct zoned_pbn source)
{
data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
data_vio->duplicate = source;
}
static struct data_vio *retire_lock_agent(struct hash_lock *lock)
{
struct data_vio *old_agent = lock->agent;
struct data_vio *new_agent = dequeue_lock_waiter(lock);
lock->agent = new_agent;
exit_hash_lock(old_agent);
if (new_agent != NULL)
set_duplicate_location(new_agent, lock->duplicate);
return new_agent;
}
static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
{
vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
return;
data_vio->compression.lock_holder = lock->agent;
launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
}
static void abort_waiter(struct vdo_waiter *waiter, void __always_unused *context)
{
write_data_vio(vdo_waiter_as_data_vio(waiter));
}
static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
{
lock->state = VDO_HASH_LOCK_BYPASSING;
exit_hash_lock(agent);
}
void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
{
struct hash_lock *lock = data_vio->hash_lock;
if (lock->state == VDO_HASH_LOCK_BYPASSING) {
exit_hash_lock(data_vio);
return;
}
if (lock->agent == NULL) {
lock->agent = data_vio;
} else if (data_vio != lock->agent) {
exit_hash_lock(data_vio);
return;
}
lock->state = VDO_HASH_LOCK_BYPASSING;
lock->update_advice = false;
vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
if (lock->duplicate_lock != NULL) {
data_vio->duplicate = lock->duplicate;
launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
return;
}
lock->agent = NULL;
data_vio->is_duplicate = false;
exit_hash_lock(data_vio);
}
static void finish_unlocking(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_hash_lock_agent(agent, __func__);
VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
"must have released the duplicate lock for the hash lock");
if (!lock->verified) {
start_writing(lock, agent);
return;
}
lock->verified = false;
if (vdo_waitq_has_waiters(&lock->waiters)) {
agent = retire_lock_agent(lock);
start_locking(lock, agent);
return;
}
start_bypassing(lock, agent);
}
static void unlock_duplicate_pbn(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_data_vio_in_duplicate_zone(agent);
VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
"must have a duplicate lock to release");
vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
vdo_forget(lock->duplicate_lock));
if (lock->state == VDO_HASH_LOCK_BYPASSING) {
complete_data_vio(completion);
return;
}
launch_data_vio_hash_zone_callback(agent, finish_unlocking);
}
static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
{
lock->state = VDO_HASH_LOCK_UNLOCKING;
launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
}
static void release_context(struct dedupe_context *context)
{
struct hash_zone *zone = context->zone;
WRITE_ONCE(zone->active, zone->active - 1);
list_move(&context->list_entry, &zone->available);
}
static void process_update_result(struct data_vio *agent)
{
struct dedupe_context *context = agent->dedupe_context;
if ((context == NULL) ||
!change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
return;
agent->dedupe_context = NULL;
release_context(context);
}
static void finish_updating(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_hash_lock_agent(agent, __func__);
process_update_result(agent);
lock->update_advice = false;
if (vdo_waitq_has_waiters(&lock->waiters)) {
start_deduping(lock, agent, true);
return;
}
if (lock->duplicate_lock != NULL) {
start_unlocking(lock, agent);
return;
}
start_bypassing(lock, agent);
}
static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
static void start_updating(struct hash_lock *lock, struct data_vio *agent)
{
lock->state = VDO_HASH_LOCK_UPDATING;
VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
set_data_vio_hash_zone_callback(agent, finish_updating);
query_index(agent, UDS_UPDATE);
}
static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
{
struct data_vio *agent = data_vio;
VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
"shouldn't have any lock waiters in DEDUPING");
if (lock->reference_count > 1) {
exit_hash_lock(data_vio);
return;
}
lock->agent = agent;
if (lock->update_advice) {
start_updating(lock, agent);
} else {
start_unlocking(lock, agent);
}
}
static int __must_check acquire_lock(struct hash_zone *zone,
const struct uds_record_name *hash,
struct hash_lock *replace_lock,
struct hash_lock **lock_ptr)
{
struct hash_lock *lock, *new_lock;
int result;
result = VDO_ASSERT(!list_empty(&zone->lock_pool),
"never need to wait for a free hash lock");
if (result != VDO_SUCCESS)
return result;
new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
list_del_init(&new_lock->pool_node);
new_lock->hash = *hash;
result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
new_lock, (replace_lock != NULL), (void **) &lock);
if (result != VDO_SUCCESS) {
return_hash_lock_to_pool(zone, vdo_forget(new_lock));
return result;
}
if (replace_lock != NULL) {
VDO_ASSERT_LOG_ONLY(lock == replace_lock,
"old lock must have been in the lock map");
VDO_ASSERT_LOG_ONLY(replace_lock->registered,
"old lock must have been marked registered");
replace_lock->registered = false;
}
if (lock == replace_lock) {
lock = new_lock;
lock->registered = true;
} else {
return_hash_lock_to_pool(zone, vdo_forget(new_lock));
}
*lock_ptr = lock;
return VDO_SUCCESS;
}
static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
{
struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
struct hash_lock *new_lock = context;
set_hash_lock(data_vio, new_lock);
wait_on_hash_lock(new_lock, data_vio);
}
static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
{
struct hash_lock *new_lock;
int result;
result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
&new_lock);
if (result != VDO_SUCCESS) {
continue_data_vio_with_error(new_agent, result);
return;
}
old_lock->update_advice = false;
new_lock->update_advice = true;
set_hash_lock(new_agent, new_lock);
new_lock->agent = new_agent;
vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
new_agent->is_duplicate = false;
start_writing(new_lock, new_agent);
}
static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
bool has_claim)
{
if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
fork_hash_lock(lock, data_vio);
return;
}
set_duplicate_location(data_vio, lock->duplicate);
data_vio->new_mapped = data_vio->duplicate;
update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
}
static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
bool agent_is_done)
{
lock->state = VDO_HASH_LOCK_DEDUPING;
if (lock->duplicate_lock == NULL) {
VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
"compression must have shared a lock");
VDO_ASSERT_LOG_ONLY(agent_is_done,
"agent must have written the new duplicate");
transfer_allocation_lock(agent);
}
VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
"duplicate_lock must be a PBN read lock");
lock->agent = NULL;
if (!agent_is_done) {
launch_dedupe(lock, agent, true);
agent = NULL;
}
while (vdo_waitq_has_waiters(&lock->waiters))
launch_dedupe(lock, dequeue_lock_waiter(lock), false);
if (agent_is_done) {
finish_deduping(lock, agent);
}
}
static inline void increment_stat(u64 *stat)
{
WRITE_ONCE(*stat, *stat + 1);
}
static void finish_verifying(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_hash_lock_agent(agent, __func__);
lock->verified = agent->is_duplicate;
if (!lock->verify_counted) {
lock->verify_counted = true;
if (lock->verified)
increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
else
increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
}
if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
agent->is_duplicate = false;
lock->verified = false;
}
if (lock->verified) {
start_deduping(lock, agent, false);
} else {
lock->update_advice = true;
start_unlocking(lock, agent);
}
}
static bool blocks_equal(char *block1, char *block2)
{
int i;
for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
return false;
}
return true;
}
static void verify_callback(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
launch_data_vio_hash_zone_callback(agent, finish_verifying);
}
static void uncompress_and_verify(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
int result;
result = uncompress_data_vio(agent, agent->duplicate.state,
agent->scratch_block);
if (result == VDO_SUCCESS) {
verify_callback(completion);
return;
}
agent->is_duplicate = false;
launch_data_vio_hash_zone_callback(agent, finish_verifying);
}
static void verify_endio(struct bio *bio)
{
struct data_vio *agent = vio_as_data_vio(bio->bi_private);
int result = blk_status_to_errno(bio->bi_status);
vdo_count_completed_bios(bio);
if (result != VDO_SUCCESS) {
agent->is_duplicate = false;
launch_data_vio_hash_zone_callback(agent, finish_verifying);
return;
}
if (vdo_is_state_compressed(agent->duplicate.state)) {
launch_data_vio_cpu_callback(agent, uncompress_and_verify,
CPU_Q_COMPRESS_BLOCK_PRIORITY);
return;
}
launch_data_vio_cpu_callback(agent, verify_callback,
CPU_Q_COMPLETE_READ_PRIORITY);
}
static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
{
int result;
struct vio *vio = &agent->vio;
char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
(char *) agent->compression.block :
agent->scratch_block);
lock->state = VDO_HASH_LOCK_VERIFYING;
VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
agent->duplicate.pbn);
if (result != VDO_SUCCESS) {
set_data_vio_hash_zone_callback(agent, finish_verifying);
continue_data_vio_with_error(agent, result);
return;
}
set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
}
static void finish_locking(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_hash_lock_agent(agent, __func__);
if (!agent->is_duplicate) {
VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
"must not hold duplicate_lock if not flagged as a duplicate");
increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
lock->update_advice = true;
start_writing(lock, agent);
return;
}
VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
"must hold duplicate_lock if flagged as a duplicate");
if (!lock->verified) {
start_verifying(lock, agent);
return;
}
if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
agent->is_duplicate = false;
lock->verified = false;
lock->update_advice = true;
start_unlocking(lock, agent);
return;
}
start_deduping(lock, agent, false);
}
static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
struct slab_depot *depot)
{
struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
if (result == VDO_SUCCESS)
return true;
vdo_log_warning_strerror(result,
"Error acquiring provisional reference for dedupe candidate; aborting dedupe");
agent->is_duplicate = false;
vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
agent->duplicate.pbn, lock);
continue_data_vio_with_error(agent, result);
return false;
}
static void lock_duplicate_pbn(struct vdo_completion *completion)
{
unsigned int increment_limit;
struct pbn_lock *lock;
int result;
struct data_vio *agent = as_data_vio(completion);
struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
struct physical_zone *zone = agent->duplicate.zone;
assert_data_vio_in_duplicate_zone(agent);
set_data_vio_hash_zone_callback(agent, finish_locking);
increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
if (increment_limit == 0) {
agent->is_duplicate = false;
continue_data_vio(agent);
return;
}
result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
VIO_READ_LOCK, &lock);
if (result != VDO_SUCCESS) {
continue_data_vio_with_error(agent, result);
return;
}
if (!vdo_is_pbn_read_lock(lock)) {
agent->is_duplicate = false;
continue_data_vio(agent);
return;
}
if (lock->holder_count == 0) {
if (!acquire_provisional_reference(agent, lock, depot))
return;
lock->increment_limit = increment_limit;
}
set_duplicate_lock(agent->hash_lock, lock);
continue_data_vio(agent);
}
static void start_locking(struct hash_lock *lock, struct data_vio *agent)
{
VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
"must not acquire a duplicate lock when already holding it");
lock->state = VDO_HASH_LOCK_LOCKING;
agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
}
static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
{
lock->duplicate = agent->new_mapped;
lock->verified = true;
if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
lock->update_advice = true;
}
if (vdo_waitq_has_waiters(&lock->waiters)) {
start_deduping(lock, agent, true);
return;
}
if (lock->update_advice) {
start_updating(lock, agent);
} else if (lock->duplicate_lock != NULL) {
set_duplicate_location(agent, lock->duplicate);
start_unlocking(lock, agent);
} else {
start_bypassing(lock, agent);
}
}
static struct data_vio *select_writing_agent(struct hash_lock *lock)
{
struct vdo_wait_queue temp_queue;
struct data_vio *data_vio;
vdo_waitq_init(&temp_queue);
while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
!data_vio_has_allocation(data_vio)) {
vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
}
if (data_vio != NULL) {
vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
lock->agent = data_vio;
} else {
data_vio = lock->agent;
}
vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
return data_vio;
}
static void start_writing(struct hash_lock *lock, struct data_vio *agent)
{
lock->state = VDO_HASH_LOCK_WRITING;
if (!data_vio_has_allocation(agent)) {
agent = select_writing_agent(lock);
if (!data_vio_has_allocation(agent)) {
continue_data_vio_with_error(agent, VDO_NO_SPACE);
return;
}
}
if (vdo_waitq_has_waiters(&lock->waiters))
cancel_data_vio_compression(agent);
launch_compress_data_vio(agent);
}
static bool decode_uds_advice(struct dedupe_context *context)
{
const struct uds_request *request = &context->request;
struct data_vio *data_vio = context->requestor;
size_t offset = 0;
const struct uds_record_data *encoding = &request->old_metadata;
struct vdo *vdo = vdo_from_data_vio(data_vio);
struct zoned_pbn *advice = &data_vio->duplicate;
u8 version;
int result;
if ((request->status != UDS_SUCCESS) || !request->found)
return false;
version = encoding->data[offset++];
if (version != UDS_ADVICE_VERSION) {
vdo_log_error("invalid UDS advice version code %u", version);
return false;
}
advice->state = encoding->data[offset++];
advice->pbn = get_unaligned_le64(&encoding->data[offset]);
offset += sizeof(u64);
BUG_ON(offset != UDS_ADVICE_SIZE);
if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
(unsigned long long) advice->pbn, advice->state,
(unsigned long long) data_vio->logical.lbn);
atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
return false;
}
result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
(unsigned long long) advice->pbn,
(unsigned long long) data_vio->logical.lbn);
atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
return false;
}
return true;
}
static void process_query_result(struct data_vio *agent)
{
struct dedupe_context *context = agent->dedupe_context;
if (context == NULL)
return;
if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
agent->is_duplicate = decode_uds_advice(context);
agent->dedupe_context = NULL;
release_context(context);
}
}
static void finish_querying(struct vdo_completion *completion)
{
struct data_vio *agent = as_data_vio(completion);
struct hash_lock *lock = agent->hash_lock;
assert_hash_lock_agent(agent, __func__);
process_query_result(agent);
if (agent->is_duplicate) {
lock->duplicate = agent->duplicate;
start_locking(lock, agent);
} else {
lock->update_advice = !data_vio_has_allocation(agent);
start_writing(lock, agent);
}
}
static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
{
lock->agent = data_vio;
lock->state = VDO_HASH_LOCK_QUERYING;
data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
set_data_vio_hash_zone_callback(data_vio, finish_querying);
query_index(data_vio,
(data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
}
static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
{
VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
get_hash_lock_state_name(lock->state));
continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
}
void vdo_continue_hash_lock(struct vdo_completion *completion)
{
struct data_vio *data_vio = as_data_vio(completion);
struct hash_lock *lock = data_vio->hash_lock;
switch (lock->state) {
case VDO_HASH_LOCK_WRITING:
VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
"only the lock agent may continue the lock");
finish_writing(lock, data_vio);
break;
case VDO_HASH_LOCK_DEDUPING:
finish_deduping(lock, data_vio);
break;
case VDO_HASH_LOCK_BYPASSING:
exit_hash_lock(data_vio);
break;
case VDO_HASH_LOCK_INITIALIZING:
case VDO_HASH_LOCK_QUERYING:
case VDO_HASH_LOCK_UPDATING:
case VDO_HASH_LOCK_LOCKING:
case VDO_HASH_LOCK_VERIFYING:
case VDO_HASH_LOCK_UNLOCKING:
report_bogus_lock_state(lock, data_vio);
break;
default:
report_bogus_lock_state(lock, data_vio);
}
}
static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
{
struct data_vio *lock_holder;
struct hash_zone *zone;
bool collides;
if (list_empty(&lock->duplicate_vios))
return false;
lock_holder = list_first_entry(&lock->duplicate_vios, struct data_vio,
hash_lock_entry);
zone = candidate->hash_zone;
collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
if (collides)
increment_stat(&zone->statistics.concurrent_hash_collisions);
else
increment_stat(&zone->statistics.concurrent_data_matches);
return collides;
}
static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
{
int result;
result = VDO_ASSERT(data_vio->hash_lock == NULL,
"must not already hold a hash lock");
if (result != VDO_SUCCESS)
return result;
result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
"must not already be a member of a hash lock list");
if (result != VDO_SUCCESS)
return result;
return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
"must not hold a recovery lock when getting a hash lock");
}
void vdo_acquire_hash_lock(struct vdo_completion *completion)
{
struct data_vio *data_vio = as_data_vio(completion);
struct hash_lock *lock;
int result;
assert_data_vio_in_hash_zone(data_vio);
result = assert_hash_lock_preconditions(data_vio);
if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result);
return;
}
result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
if (result != VDO_SUCCESS) {
continue_data_vio_with_error(data_vio, result);
return;
}
if (is_hash_collision(lock, data_vio)) {
write_data_vio(data_vio);
return;
}
set_hash_lock(data_vio, lock);
switch (lock->state) {
case VDO_HASH_LOCK_INITIALIZING:
start_querying(lock, data_vio);
return;
case VDO_HASH_LOCK_QUERYING:
case VDO_HASH_LOCK_WRITING:
case VDO_HASH_LOCK_UPDATING:
case VDO_HASH_LOCK_LOCKING:
case VDO_HASH_LOCK_VERIFYING:
case VDO_HASH_LOCK_UNLOCKING:
wait_on_hash_lock(lock, data_vio);
return;
case VDO_HASH_LOCK_BYPASSING:
vdo_release_hash_lock(data_vio);
write_data_vio(data_vio);
return;
case VDO_HASH_LOCK_DEDUPING:
launch_dedupe(lock, data_vio, false);
return;
default:
report_bogus_lock_state(lock, data_vio);
}
}
void vdo_release_hash_lock(struct data_vio *data_vio)
{
u64 lock_key;
struct hash_lock *lock = data_vio->hash_lock;
struct hash_zone *zone = data_vio->hash_zone;
if (lock == NULL)
return;
set_hash_lock(data_vio, NULL);
if (lock->reference_count > 0) {
return;
}
lock_key = hash_lock_key(lock);
if (lock->registered) {
struct hash_lock *removed;
removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
VDO_ASSERT_LOG_ONLY(lock == removed,
"hash lock being released must have been mapped");
} else {
VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
"unregistered hash lock must not be in the lock map");
}
VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
"hash lock returned to zone must have no waiters");
VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
"hash lock returned to zone must not reference a PBN lock");
VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
"returned hash lock must not be in use with state %s",
get_hash_lock_state_name(lock->state));
VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
"hash lock returned to zone must not be in a pool list");
VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_vios),
"hash lock returned to zone must not reference DataVIOs");
return_hash_lock_to_pool(zone, lock);
}
static void transfer_allocation_lock(struct data_vio *data_vio)
{
struct allocation *allocation = &data_vio->allocation;
struct hash_lock *hash_lock = data_vio->hash_lock;
VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
"transferred lock must be for the block written");
allocation->pbn = VDO_ZERO_BLOCK;
VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
"must have downgraded the allocation lock before transfer");
hash_lock->duplicate = data_vio->new_mapped;
data_vio->duplicate = data_vio->new_mapped;
hash_lock->duplicate_lock = vdo_forget(allocation->lock);
}
void vdo_share_compressed_write_lock(struct data_vio *data_vio,
struct pbn_lock *pbn_lock)
{
bool claimed;
VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
"a duplicate PBN lock should not exist when writing");
VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
"lock transfer must be for a compressed write");
assert_data_vio_in_new_mapped_zone(data_vio);
if (!vdo_is_pbn_read_lock(pbn_lock))
vdo_downgrade_pbn_write_lock(pbn_lock, true);
data_vio->duplicate = data_vio->new_mapped;
data_vio->hash_lock->duplicate = data_vio->new_mapped;
set_duplicate_lock(data_vio->hash_lock, pbn_lock);
claimed = vdo_claim_pbn_lock_increment(pbn_lock);
VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
}
static void start_uds_queue(void *ptr)
{
struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
vdo_register_allocating_thread(&thread->allocating_thread, NULL);
}
static void finish_uds_queue(void *ptr __always_unused)
{
vdo_unregister_allocating_thread();
}
static void close_index(struct hash_zones *zones)
__must_hold(&zones->lock)
{
int result;
zones->index_state = IS_CHANGING;
spin_unlock(&zones->lock);
result = uds_close_index(zones->index_session);
if (result != UDS_SUCCESS)
vdo_log_error_strerror(result, "Error closing index");
spin_lock(&zones->lock);
zones->index_state = IS_CLOSED;
zones->error_flag |= result != UDS_SUCCESS;
}
static void open_index(struct hash_zones *zones)
__must_hold(&zones->lock)
{
int result;
bool create_flag = zones->create_flag;
zones->create_flag = false;
zones->index_state = IS_CHANGING;
zones->error_flag = false;
spin_unlock(&zones->lock);
result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
&zones->parameters, zones->index_session);
if (result != UDS_SUCCESS)
vdo_log_error_strerror(result, "Error opening index");
spin_lock(&zones->lock);
if (!create_flag) {
switch (result) {
case -ENOENT:
zones->index_state = IS_CLOSED;
zones->create_flag = true;
return;
default:
break;
}
}
if (result == UDS_SUCCESS) {
zones->index_state = IS_OPENED;
} else {
zones->index_state = IS_CLOSED;
zones->index_target = IS_CLOSED;
zones->error_flag = true;
spin_unlock(&zones->lock);
vdo_log_info("Setting UDS index target state to error");
spin_lock(&zones->lock);
}
}
static void change_dedupe_state(struct vdo_completion *completion)
{
struct hash_zones *zones = as_hash_zones(completion);
spin_lock(&zones->lock);
while (vdo_is_state_normal(&zones->state) &&
((zones->index_state != zones->index_target) || zones->create_flag)) {
if (zones->index_state == IS_OPENED)
close_index(zones);
else
open_index(zones);
}
zones->changing = false;
spin_unlock(&zones->lock);
}
static void start_expiration_timer(struct dedupe_context *context)
{
u64 start_time = context->submission_jiffies;
u64 end_time;
if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
DEDUPE_QUERY_TIMER_RUNNING))
return;
end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
jiffies + vdo_dedupe_index_min_timer_jiffies);
mod_timer(&context->zone->timer, end_time);
}
static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
{
atomic64_add(timeouts, &zones->timeouts);
spin_lock(&zones->lock);
if (__ratelimit(&zones->ratelimiter)) {
u64 unreported = atomic64_read(&zones->timeouts);
unreported -= zones->reported_timeouts;
vdo_log_debug("UDS index timeout on %llu requests",
(unsigned long long) unreported);
zones->reported_timeouts += unreported;
}
spin_unlock(&zones->lock);
}
static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
{
int result;
off_t uds_offset;
struct volume_geometry geometry = vdo->geometry;
static const struct vdo_work_queue_type uds_queue_type = {
.start = start_uds_queue,
.finish = finish_uds_queue,
.max_priority = UDS_Q_MAX_PRIORITY,
.default_priority = UDS_Q_PRIORITY,
};
vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
spin_lock_init(&zones->lock);
ratelimit_default_init(&zones->ratelimiter);
ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
uds_offset = ((vdo_get_index_region_start(geometry) -
geometry.bio_offset) * VDO_BLOCK_SIZE);
zones->parameters = (struct uds_parameters) {
.bdev = vdo->device_config->owned_device->bdev,
.offset = uds_offset,
.size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
.memory_size = geometry.index_config.mem,
.sparse = geometry.index_config.sparse,
.nonce = (u64) geometry.nonce,
};
result = uds_create_index_session(&zones->index_session);
if (result != UDS_SUCCESS)
return result;
result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
1, NULL);
if (result != VDO_SUCCESS) {
uds_destroy_index_session(vdo_forget(zones->index_session));
vdo_log_error("UDS index queue initialization failed (%d)", result);
return result;
}
vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
vdo_set_completion_callback(&zones->completion, change_dedupe_state,
vdo->thread_config.dedupe_thread);
return VDO_SUCCESS;
}
static void finish_index_operation(struct uds_request *request)
{
struct dedupe_context *context = container_of(request, struct dedupe_context,
request);
if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
DEDUPE_CONTEXT_COMPLETE)) {
continue_data_vio(context->requestor);
return;
}
if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
atomic_read(&context->state));
}
vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
}
static void check_for_drain_complete(struct hash_zone *zone)
{
data_vio_count_t recycled = 0;
if (!vdo_is_state_draining(&zone->state))
return;
if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
DEDUPE_QUERY_TIMER_IDLE)) {
timer_delete_sync(&zone->timer);
} else {
return;
}
for (;;) {
struct dedupe_context *context;
struct funnel_queue_entry *entry;
entry = vdo_funnel_queue_poll(zone->timed_out_complete);
if (entry == NULL)
break;
context = container_of(entry, struct dedupe_context, queue_entry);
atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
list_add(&context->list_entry, &zone->available);
recycled++;
}
if (recycled > 0)
WRITE_ONCE(zone->active, zone->active - recycled);
VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
vdo_finish_draining(&zone->state);
}
static void timeout_index_operations_callback(struct vdo_completion *completion)
{
struct dedupe_context *context, *tmp;
struct hash_zone *zone = as_hash_zone(completion);
u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
unsigned long cutoff = jiffies - timeout_jiffies;
unsigned int timed_out = 0;
atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
if (cutoff <= context->submission_jiffies) {
start_expiration_timer(context);
break;
}
if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
DEDUPE_CONTEXT_TIMED_OUT)) {
continue;
}
list_del_init(&context->list_entry);
context->requestor->dedupe_context = NULL;
continue_data_vio(context->requestor);
timed_out++;
}
if (timed_out > 0)
report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
check_for_drain_complete(zone);
}
static void timeout_index_operations(struct timer_list *t)
{
struct hash_zone *zone = timer_container_of(zone, t, timer);
if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
DEDUPE_QUERY_TIMER_FIRED))
vdo_launch_completion(&zone->completion);
}
static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
zone_count_t zone_number)
{
int result;
data_vio_count_t i;
struct hash_zone *zone = &zones->zones[zone_number];
result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
if (result != VDO_SUCCESS)
return result;
vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
zone->zone_number = zone_number;
zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
zone->thread_id);
INIT_LIST_HEAD(&zone->lock_pool);
result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
&zone->lock_array);
if (result != VDO_SUCCESS)
return result;
for (i = 0; i < LOCK_POOL_CAPACITY; i++)
return_hash_lock_to_pool(zone, &zone->lock_array[i]);
INIT_LIST_HEAD(&zone->available);
INIT_LIST_HEAD(&zone->pending);
result = vdo_make_funnel_queue(&zone->timed_out_complete);
if (result != VDO_SUCCESS)
return result;
timer_setup(&zone->timer, timeout_index_operations, 0);
for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
struct dedupe_context *context = &zone->contexts[i];
context->zone = zone;
context->request.callback = finish_index_operation;
context->request.session = zones->index_session;
list_add(&context->list_entry, &zone->available);
}
return vdo_make_default_thread(vdo, zone->thread_id);
}
static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
{
struct hash_zones *zones = context;
return zones->zones[zone_number].thread_id;
}
int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
{
int result;
struct hash_zones *zones;
zone_count_t z;
zone_count_t zone_count = vdo->thread_config.hash_zone_count;
if (zone_count == 0)
return VDO_SUCCESS;
result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
__func__, &zones);
if (result != VDO_SUCCESS)
return result;
result = initialize_index(vdo, zones);
if (result != VDO_SUCCESS) {
vdo_free(zones);
return result;
}
vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
zones->zone_count = zone_count;
for (z = 0; z < zone_count; z++) {
result = initialize_zone(vdo, zones, z);
if (result != VDO_SUCCESS) {
vdo_free_hash_zones(zones);
return result;
}
}
result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
vdo->thread_config.admin_thread, zones, NULL,
vdo, &zones->manager);
if (result != VDO_SUCCESS) {
vdo_free_hash_zones(zones);
return result;
}
*zones_ptr = zones;
return VDO_SUCCESS;
}
void vdo_finish_dedupe_index(struct hash_zones *zones)
{
if (zones == NULL)
return;
uds_destroy_index_session(vdo_forget(zones->index_session));
}
void vdo_free_hash_zones(struct hash_zones *zones)
{
zone_count_t i;
if (zones == NULL)
return;
vdo_free(vdo_forget(zones->manager));
for (i = 0; i < zones->zone_count; i++) {
struct hash_zone *zone = &zones->zones[i];
vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
vdo_int_map_free(vdo_forget(zone->hash_lock_map));
vdo_free(vdo_forget(zone->lock_array));
}
if (zones->index_session != NULL)
vdo_finish_dedupe_index(zones);
ratelimit_state_exit(&zones->ratelimiter);
vdo_free(zones);
}
static void initiate_suspend_index(struct admin_state *state)
{
struct hash_zones *zones = container_of(state, struct hash_zones, state);
enum index_state index_state;
spin_lock(&zones->lock);
index_state = zones->index_state;
spin_unlock(&zones->lock);
if (index_state != IS_CLOSED) {
bool save = vdo_is_state_saving(&zones->state);
int result;
result = uds_suspend_index_session(zones->index_session, save);
if (result != UDS_SUCCESS)
vdo_log_error_strerror(result, "Error suspending dedupe index");
}
vdo_finish_draining(state);
}
static void suspend_index(void *context, struct vdo_completion *completion)
{
struct hash_zones *zones = context;
vdo_start_draining(&zones->state,
vdo_get_current_manager_operation(zones->manager), completion,
initiate_suspend_index);
}
static void initiate_drain(struct admin_state *state)
{
check_for_drain_complete(container_of(state, struct hash_zone, state));
}
static void drain_hash_zone(void *context, zone_count_t zone_number,
struct vdo_completion *parent)
{
struct hash_zones *zones = context;
vdo_start_draining(&zones->zones[zone_number].state,
vdo_get_current_manager_operation(zones->manager), parent,
initiate_drain);
}
void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
{
vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
drain_hash_zone, NULL, parent);
}
static void launch_dedupe_state_change(struct hash_zones *zones)
__must_hold(&zones->lock)
{
if (zones->changing || !vdo_is_state_normal(&zones->state))
return;
if (zones->create_flag || (zones->index_state != zones->index_target)) {
zones->changing = true;
vdo_launch_completion(&zones->completion);
return;
}
}
static void resume_index(void *context, struct vdo_completion *parent)
{
struct hash_zones *zones = context;
struct device_config *config = parent->vdo->device_config;
int result;
zones->parameters.bdev = config->owned_device->bdev;
result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
if (result != UDS_SUCCESS)
vdo_log_error_strerror(result, "Error resuming dedupe index");
spin_lock(&zones->lock);
vdo_resume_if_quiescent(&zones->state);
if (config->deduplication) {
zones->index_target = IS_OPENED;
WRITE_ONCE(zones->dedupe_flag, true);
} else {
zones->index_target = IS_CLOSED;
}
launch_dedupe_state_change(zones);
spin_unlock(&zones->lock);
vdo_finish_completion(parent);
}
static void resume_hash_zone(void *context, zone_count_t zone_number,
struct vdo_completion *parent)
{
struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
}
void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
{
if (vdo_is_read_only(parent->vdo)) {
vdo_launch_completion(parent);
return;
}
vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
resume_hash_zone, NULL, parent);
}
static void get_hash_zone_statistics(const struct hash_zone *zone,
struct hash_lock_statistics *tally)
{
const struct hash_lock_statistics *stats = &zone->statistics;
tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
tally->curr_dedupe_queries += READ_ONCE(zone->active);
}
static void get_index_statistics(struct hash_zones *zones,
struct index_statistics *stats)
{
enum index_state state;
struct uds_index_stats index_stats;
int result;
spin_lock(&zones->lock);
state = zones->index_state;
spin_unlock(&zones->lock);
if (state != IS_OPENED)
return;
result = uds_get_index_session_stats(zones->index_session, &index_stats);
if (result != UDS_SUCCESS) {
vdo_log_error_strerror(result, "Error reading index stats");
return;
}
stats->entries_indexed = index_stats.entries_indexed;
stats->posts_found = index_stats.posts_found;
stats->posts_not_found = index_stats.posts_not_found;
stats->queries_found = index_stats.queries_found;
stats->queries_not_found = index_stats.queries_not_found;
stats->updates_found = index_stats.updates_found;
stats->updates_not_found = index_stats.updates_not_found;
stats->entries_discarded = index_stats.entries_discarded;
}
void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
{
zone_count_t zone;
for (zone = 0; zone < zones->zone_count; zone++)
get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
get_index_statistics(zones, &stats->index);
stats->dedupe_advice_timeouts =
(atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
}
struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
const struct uds_record_name *name)
{
u32 hash = name->name[0];
hash = (hash * zones->zone_count) >> 8;
return &zones->zones[hash];
}
static void dump_hash_lock(const struct hash_lock *lock)
{
const char *state;
if (!list_empty(&lock->pool_node)) {
return;
}
state = get_hash_lock_state_name(lock->state);
vdo_log_info(" hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
lock, state, (lock->registered ? 'D' : 'U'),
(unsigned long long) lock->duplicate.pbn,
lock->duplicate.state, lock->reference_count,
vdo_waitq_num_waiters(&lock->waiters), lock->agent);
}
static const char *index_state_to_string(struct hash_zones *zones,
enum index_state state)
{
if (!vdo_is_state_normal(&zones->state))
return SUSPENDED;
switch (state) {
case IS_CLOSED:
return zones->error_flag ? ERROR : CLOSED;
case IS_CHANGING:
return zones->index_target == IS_OPENED ? OPENING : CLOSING;
case IS_OPENED:
return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
default:
return UNKNOWN;
}
}
static void dump_hash_zone(const struct hash_zone *zone)
{
data_vio_count_t i;
if (zone->hash_lock_map == NULL) {
vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
return;
}
vdo_log_info("struct hash_zone %u: mapSize=%zu",
zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
for (i = 0; i < LOCK_POOL_CAPACITY; i++)
dump_hash_lock(&zone->lock_array[i]);
}
void vdo_dump_hash_zones(struct hash_zones *zones)
{
const char *state, *target;
zone_count_t zone;
spin_lock(&zones->lock);
state = index_state_to_string(zones, zones->index_state);
target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
spin_unlock(&zones->lock);
vdo_log_info("UDS index: state: %s", state);
if (target != NULL)
vdo_log_info("UDS index: changing to state: %s", target);
for (zone = 0; zone < zones->zone_count; zone++)
dump_hash_zone(&zones->zones[zone]);
}
void vdo_set_dedupe_index_timeout_interval(unsigned int value)
{
u64 alb_jiffies;
if (value > 120000)
value = 120000;
alb_jiffies = msecs_to_jiffies(value);
if (alb_jiffies < 2) {
alb_jiffies = 2;
value = jiffies_to_msecs(alb_jiffies);
}
vdo_dedupe_index_timeout_interval = value;
vdo_dedupe_index_timeout_jiffies = alb_jiffies;
}
void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
{
u64 min_jiffies;
if (value > 1000)
value = 1000;
min_jiffies = msecs_to_jiffies(value);
if (min_jiffies < 2) {
min_jiffies = 2;
value = jiffies_to_msecs(min_jiffies);
}
vdo_dedupe_index_min_timer_interval = value;
vdo_dedupe_index_min_timer_jiffies = min_jiffies;
}
static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
{
struct dedupe_context *context;
struct funnel_queue_entry *entry;
assert_in_hash_zone(zone, __func__);
if (!list_empty(&zone->available)) {
WRITE_ONCE(zone->active, zone->active + 1);
context = list_first_entry(&zone->available, struct dedupe_context,
list_entry);
list_del_init(&context->list_entry);
return context;
}
entry = vdo_funnel_queue_poll(zone->timed_out_complete);
return ((entry == NULL) ?
NULL : container_of(entry, struct dedupe_context, queue_entry));
}
static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
enum uds_request_type operation)
{
request->record_name = data_vio->record_name;
request->type = operation;
if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
size_t offset = 0;
struct uds_record_data *encoding = &request->new_metadata;
encoding->data[offset++] = UDS_ADVICE_VERSION;
encoding->data[offset++] = data_vio->new_mapped.state;
put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
offset += sizeof(u64);
BUG_ON(offset != UDS_ADVICE_SIZE);
}
}
static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
{
int result;
struct dedupe_context *context;
struct vdo *vdo = vdo_from_data_vio(data_vio);
struct hash_zone *zone = data_vio->hash_zone;
assert_data_vio_in_hash_zone(data_vio);
if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
continue_data_vio(data_vio);
return;
}
context = acquire_context(zone);
if (context == NULL) {
atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
continue_data_vio(data_vio);
return;
}
data_vio->dedupe_context = context;
context->requestor = data_vio;
context->submission_jiffies = jiffies;
prepare_uds_request(&context->request, data_vio, operation);
atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
list_add_tail(&context->list_entry, &zone->pending);
start_expiration_timer(context);
result = uds_launch_request(&context->request);
if (result != UDS_SUCCESS) {
context->request.status = result;
finish_index_operation(&context->request);
}
}
static void set_target_state(struct hash_zones *zones, enum index_state target,
bool change_dedupe, bool dedupe, bool set_create)
{
const char *old_state, *new_state;
spin_lock(&zones->lock);
old_state = index_state_to_string(zones, zones->index_target);
if (change_dedupe)
WRITE_ONCE(zones->dedupe_flag, dedupe);
if (set_create)
zones->create_flag = true;
zones->index_target = target;
launch_dedupe_state_change(zones);
new_state = index_state_to_string(zones, zones->index_target);
spin_unlock(&zones->lock);
if (old_state != new_state)
vdo_log_info("Setting UDS index target state to %s", new_state);
}
const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
{
const char *state;
spin_lock(&zones->lock);
state = index_state_to_string(zones, zones->index_state);
spin_unlock(&zones->lock);
return state;
}
int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
{
if (strcasecmp(name, "index-close") == 0) {
set_target_state(zones, IS_CLOSED, false, false, false);
return 0;
} else if (strcasecmp(name, "index-create") == 0) {
set_target_state(zones, IS_OPENED, false, false, true);
return 0;
} else if (strcasecmp(name, "index-disable") == 0) {
set_target_state(zones, IS_OPENED, true, false, false);
return 0;
} else if (strcasecmp(name, "index-enable") == 0) {
set_target_state(zones, IS_OPENED, true, true, false);
return 0;
}
return -EINVAL;
}
void vdo_set_dedupe_state_normal(struct hash_zones *zones)
{
vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
}
void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
{
set_target_state(zones, IS_OPENED, true, true, create_flag);
}