#include <new>
#include <stdlib.h>
#include "DoublyLinkedList.h"
#include "fssh_atomic.h"
#include "fssh_errno.h"
#include "fssh_fs_cache.h"
#include "fssh_kernel_export.h"
#include "fssh_lock.h"
#include "fssh_string.h"
#include "fssh_unistd.h"
#include "hash.h"
#include "vfs.h"
#ifdef TRACE_BLOCK_CACHE
# define TRACE(x) fssh_dprintf x
#else
# define TRACE(x) ;
#endif
using std::nothrow;
#define FATAL(x) fssh_panic x
#undef offsetof
#define offsetof(struct, member) 0
namespace FSShell {
struct hash_table;
struct vm_page;
#undef DEBUG_CHANGED
struct cache_transaction;
struct cached_block;
struct block_cache;
typedef DoublyLinkedListLink<cached_block> block_link;
struct cached_block {
cached_block* next;
cached_block* transaction_next;
block_link link;
fssh_off_t block_number;
void* current_data;
void* original_data;
void* parent_data;
#ifdef DEBUG_CHANGED
void *compare;
#endif
int32_t ref_count;
int32_t accessed;
bool busy : 1;
bool is_writing : 1;
bool is_dirty : 1;
bool unused : 1;
bool discard : 1;
cache_transaction* transaction;
cache_transaction* previous_transaction;
static int Compare(void* _cacheEntry, const void* _block);
static uint32_t Hash(void* _cacheEntry, const void* _block, uint32_t range);
};
typedef DoublyLinkedList<cached_block,
DoublyLinkedListMemberGetLink<cached_block,
&cached_block::link> > block_list;
struct cache_notification : DoublyLinkedListLinkImpl<cache_notification> {
int32_t transaction_id;
int32_t events_pending;
int32_t events;
fssh_transaction_notification_hook hook;
void* data;
bool delete_after_event;
};
typedef DoublyLinkedList<cache_notification> NotificationList;
struct block_cache {
hash_table* hash;
fssh_mutex lock;
int fd;
fssh_off_t max_blocks;
fssh_size_t block_size;
int32_t allocated_block_count;
int32_t next_transaction_id;
cache_transaction* last_transaction;
hash_table* transaction_hash;
block_list unused_blocks;
bool read_only;
NotificationList pending_notifications;
block_cache(int fd, fssh_off_t numBlocks,
fssh_size_t blockSize, bool readOnly);
~block_cache();
fssh_status_t Init();
void Free(void* buffer);
void* Allocate();
void RemoveUnusedBlocks(int32_t maxAccessed = INT32_MAX,
int32_t count = INT32_MAX);
void RemoveBlock(cached_block* block);
void DiscardBlock(cached_block* block);
void FreeBlock(cached_block* block);
cached_block* NewBlock(fssh_off_t blockNumber);
};
static const int32_t kMaxBlockCount = 1024;
struct cache_listener;
typedef DoublyLinkedListLink<cache_listener> listener_link;
struct cache_listener : cache_notification {
listener_link link;
};
typedef DoublyLinkedList<cache_listener,
DoublyLinkedListMemberGetLink<cache_listener,
&cache_listener::link> > ListenerList;
struct cache_transaction {
cache_transaction();
cache_transaction* next;
int32_t id;
int32_t num_blocks;
int32_t main_num_blocks;
int32_t sub_num_blocks;
cached_block* first_block;
block_list blocks;
fssh_transaction_notification_hook notification_hook;
void* notification_data;
ListenerList listeners;
bool open;
bool has_sub_transaction;
};
static fssh_status_t write_cached_block(block_cache* cache, cached_block* block,
bool deleteTransaction = true);
static fssh_mutex sNotificationsLock;
static inline bool
is_closing_event(int32_t event)
{
return (event & (FSSH_TRANSACTION_ABORTED | FSSH_TRANSACTION_ENDED)) != 0;
}
static inline bool
is_written_event(int32_t event)
{
return (event & FSSH_TRANSACTION_WRITTEN) != 0;
}
static bool
get_next_pending_event(cache_notification* notification, int32_t* _event)
{
for (int32_t eventMask = 1; eventMask <= FSSH_TRANSACTION_IDLE; eventMask <<= 1) {
int32_t pending = fssh_atomic_and(¬ification->events_pending,
~eventMask);
bool more = (pending & ~eventMask) != 0;
if ((pending & eventMask) != 0) {
*_event = eventMask;
return more;
}
}
return false;
}
static void
flush_pending_notifications(block_cache* cache)
{
while (true) {
MutexLocker locker(sNotificationsLock);
cache_notification* notification = cache->pending_notifications.Head();
if (notification == NULL)
return;
bool deleteAfterEvent = false;
int32_t event = -1;
if (!get_next_pending_event(notification, &event)) {
cache->pending_notifications.Remove(notification);
deleteAfterEvent = notification->delete_after_event;
}
if (event >= 0) {
cache_notification copy = *notification;
locker.Unlock();
copy.hook(copy.transaction_id, event, copy.data);
locker.Lock();
}
if (deleteAfterEvent)
delete notification;
}
}
static void
set_notification(cache_transaction* transaction,
cache_notification ¬ification, int32_t events,
fssh_transaction_notification_hook hook, void* data)
{
notification.transaction_id = transaction != NULL ? transaction->id : -1;
notification.events_pending = 0;
notification.events = events;
notification.hook = hook;
notification.data = data;
notification.delete_after_event = false;
}
static void
delete_notification(cache_notification* notification)
{
MutexLocker locker(sNotificationsLock);
if (notification->events_pending != 0)
notification->delete_after_event = true;
else
delete notification;
}
static void
add_notification(block_cache* cache, cache_notification* notification,
int32_t event, bool deleteNotification)
{
if (notification->hook == NULL)
return;
int32_t pending = fssh_atomic_or(¬ification->events_pending, event);
if (pending == 0) {
MutexLocker locker(sNotificationsLock);
if (deleteNotification)
notification->delete_after_event = true;
cache->pending_notifications.Add(notification);
} else if (deleteNotification) {
delete_notification(notification);
}
}
static void
notify_transaction_listeners(block_cache* cache, cache_transaction* transaction,
int32_t event)
{
bool isClosing = is_closing_event(event);
bool isWritten = is_written_event(event);
ListenerList::Iterator iterator = transaction->listeners.GetIterator();
while (iterator.HasNext()) {
cache_listener* listener = iterator.Next();
bool remove = (isClosing && !is_written_event(listener->events))
|| (isWritten && is_written_event(listener->events));
if (remove)
iterator.Remove();
if ((listener->events & event) != 0)
add_notification(cache, listener, event, remove);
else if (remove)
delete_notification(listener);
}
flush_pending_notifications(cache);
}
static void
remove_transaction_listeners(block_cache* cache, cache_transaction* transaction)
{
ListenerList::Iterator iterator = transaction->listeners.GetIterator();
while (iterator.HasNext()) {
cache_listener* listener = iterator.Next();
iterator.Remove();
delete_notification(listener);
}
}
static fssh_status_t
add_transaction_listener(block_cache* cache, cache_transaction* transaction,
int32_t events, fssh_transaction_notification_hook hookFunction, void* data)
{
ListenerList::Iterator iterator = transaction->listeners.GetIterator();
while (iterator.HasNext()) {
cache_listener* listener = iterator.Next();
if (listener->data == data && listener->hook == hookFunction) {
listener->events |= events;
return FSSH_B_OK;
}
}
cache_listener* listener = new(std::nothrow) cache_listener;
if (listener == NULL)
return FSSH_B_NO_MEMORY;
set_notification(transaction, *listener, events, hookFunction, data);
transaction->listeners.Add(listener);
return FSSH_B_OK;
}
cache_transaction::cache_transaction()
{
num_blocks = 0;
main_num_blocks = 0;
sub_num_blocks = 0;
first_block = NULL;
notification_hook = NULL;
notification_data = NULL;
open = true;
has_sub_transaction = false;
}
static int
transaction_compare(void* _transaction, const void* _id)
{
cache_transaction* transaction = (cache_transaction*)_transaction;
const int32_t* id = (const int32_t*)_id;
return transaction->id - *id;
}
static uint32_t
transaction_hash(void* _transaction, const void* _id, uint32_t range)
{
cache_transaction* transaction = (cache_transaction*)_transaction;
const int32_t* id = (const int32_t*)_id;
if (transaction != NULL)
return transaction->id % range;
return (uint32_t)*id % range;
}
static void
delete_transaction(block_cache* cache, cache_transaction* transaction)
{
if (cache->last_transaction == transaction)
cache->last_transaction = NULL;
remove_transaction_listeners(cache, transaction);
delete transaction;
}
static cache_transaction*
lookup_transaction(block_cache* cache, int32_t id)
{
return (cache_transaction*)hash_lookup(cache->transaction_hash, &id);
}
int
cached_block::Compare(void* _cacheEntry, const void* _block)
{
cached_block* cacheEntry = (cached_block*)_cacheEntry;
const fssh_off_t* block = (const fssh_off_t*)_block;
fssh_off_t diff = cacheEntry->block_number - *block;
if (diff > 0)
return 1;
return diff < 0 ? -1 : 0;
}
uint32_t
cached_block::Hash(void* _cacheEntry, const void* _block, uint32_t range)
{
cached_block* cacheEntry = (cached_block*)_cacheEntry;
const fssh_off_t* block = (const fssh_off_t*)_block;
if (cacheEntry != NULL)
return cacheEntry->block_number % range;
return (uint64_t)*block % range;
}
block_cache::block_cache(int _fd, fssh_off_t numBlocks, fssh_size_t blockSize,
bool readOnly)
:
hash(NULL),
fd(_fd),
max_blocks(numBlocks),
block_size(blockSize),
allocated_block_count(0),
next_transaction_id(1),
last_transaction(NULL),
transaction_hash(NULL),
read_only(readOnly)
{
}
block_cache::~block_cache()
{
hash_uninit(transaction_hash);
hash_uninit(hash);
fssh_mutex_destroy(&lock);
}
fssh_status_t
block_cache::Init()
{
fssh_mutex_init(&lock, "block cache");
if (lock.sem < FSSH_B_OK)
return lock.sem;
hash = hash_init(128, offsetof(cached_block, next), &cached_block::Compare,
&cached_block::Hash);
if (hash == NULL)
return FSSH_B_NO_MEMORY;
transaction_hash = hash_init(16, offsetof(cache_transaction, next),
&transaction_compare, &FSShell::transaction_hash);
if (transaction_hash == NULL)
return FSSH_B_NO_MEMORY;
return FSSH_B_OK;
}
void
block_cache::Free(void* buffer)
{
if (buffer == NULL)
return;
free(buffer);
}
void*
block_cache::Allocate()
{
return malloc(block_size);
}
void
block_cache::FreeBlock(cached_block* block)
{
Free(block->current_data);
if (block->original_data != NULL || block->parent_data != NULL) {
fssh_panic("block_cache::FreeBlock(): %" FSSH_B_PRIdOFF
", original %p, parent %p\n", block->block_number,
block->original_data, block->parent_data);
}
#ifdef DEBUG_CHANGED
Free(block->compare);
#endif
delete block;
}
cached_block*
block_cache::NewBlock(fssh_off_t blockNumber)
{
cached_block* block = new(nothrow) cached_block;
if (block == NULL) {
FATAL(("could not allocate block!\n"));
return NULL;
}
if (allocated_block_count >= kMaxBlockCount) {
RemoveUnusedBlocks(INT32_MAX,
allocated_block_count - kMaxBlockCount + 1);
}
block->current_data = Allocate();
if (block->current_data == NULL) {
FATAL(("could not allocate block data!\n"));
delete block;
return NULL;
}
block->block_number = blockNumber;
block->ref_count = 0;
block->accessed = 0;
block->transaction_next = NULL;
block->transaction = block->previous_transaction = NULL;
block->original_data = NULL;
block->parent_data = NULL;
block->is_dirty = false;
block->unused = false;
block->discard = false;
#ifdef DEBUG_CHANGED
block->compare = NULL;
#endif
allocated_block_count++;
return block;
}
void
block_cache::RemoveUnusedBlocks(int32_t maxAccessed, int32_t count)
{
TRACE(("block_cache: remove up to %ld unused blocks\n", count));
for (block_list::Iterator iterator = unused_blocks.GetIterator();
cached_block *block = iterator.Next();) {
if (maxAccessed < block->accessed)
continue;
TRACE((" remove block %lld, accessed %ld times\n",
block->block_number, block->accessed));
if (block->is_dirty && !block->discard)
write_cached_block(this, block, false);
iterator.Remove();
RemoveBlock(block);
if (--count <= 0)
break;
}
}
void
block_cache::RemoveBlock(cached_block* block)
{
hash_remove(hash, block);
FreeBlock(block);
}
void
block_cache::DiscardBlock(cached_block* block)
{
if (block->parent_data != NULL && block->parent_data != block->current_data)
Free(block->parent_data);
block->parent_data = NULL;
if (block->original_data != NULL) {
Free(block->original_data);
block->original_data = NULL;
}
RemoveBlock(block);
}
static void
put_cached_block(block_cache* cache, cached_block* block)
{
#ifdef DEBUG_CHANGED
if (!block->is_dirty && block->compare != NULL
&& memcmp(block->current_data, block->compare, cache->block_size)) {
fssh_dprintf("new block:\n");
fssh_dump_block((const char*)block->current_data, 256, " ");
fssh_dprintf("unchanged block:\n");
fssh_dump_block((const char*)block->compare, 256, " ");
write_cached_block(cache, block);
fssh_panic("block_cache: supposed to be clean block was changed!\n");
cache->Free(block->compare);
block->compare = NULL;
}
#endif
if (--block->ref_count == 0
&& block->transaction == NULL && block->previous_transaction == NULL) {
if (block->discard) {
cache->RemoveBlock(block);
} else {
block->unused = true;
cache->unused_blocks.Add(block);
}
}
if (cache->allocated_block_count > kMaxBlockCount) {
cache->RemoveUnusedBlocks(INT32_MAX,
cache->allocated_block_count - kMaxBlockCount);
}
}
static void
put_cached_block(block_cache* cache, fssh_off_t blockNumber)
{
if (blockNumber < 0 || blockNumber >= cache->max_blocks) {
fssh_panic("put_cached_block: invalid block number %" FSSH_B_PRIdOFF
" (max %" FSSH_B_PRIdOFF ")", blockNumber, cache->max_blocks - 1);
}
cached_block* block = (cached_block*)hash_lookup(cache->hash, &blockNumber);
if (block != NULL)
put_cached_block(cache, block);
}
static fssh_status_t
get_cached_block(block_cache* cache, fssh_off_t blockNumber, bool* _allocated,
bool readBlock, cached_block** _block)
{
if (blockNumber < 0 || blockNumber >= cache->max_blocks) {
fssh_panic("get_cached_block: invalid block number %" FSSH_B_PRIdOFF
" (max %" FSSH_B_PRIdOFF ")", blockNumber, cache->max_blocks - 1);
return FSSH_B_BAD_VALUE;
}
cached_block* block = (cached_block*)hash_lookup(cache->hash,
&blockNumber);
*_allocated = false;
if (block == NULL) {
block = cache->NewBlock(blockNumber);
if (block == NULL)
return FSSH_B_NO_MEMORY;
hash_insert(cache->hash, block);
*_allocated = true;
}
if (*_allocated && readBlock) {
int32_t blockSize = cache->block_size;
if (fssh_read_pos(cache->fd, blockNumber * blockSize, block->current_data,
blockSize) < blockSize) {
cache->RemoveBlock(block);
FATAL(("could not read block %" FSSH_B_PRIdOFF "\n", blockNumber));
return fssh_errno;
}
}
if (block->unused) {
block->unused = false;
cache->unused_blocks.Remove(block);
}
block->ref_count++;
block->accessed++;
*_block = block;
return FSSH_B_OK;
}
static fssh_status_t
get_writable_cached_block(block_cache* cache, fssh_off_t blockNumber,
int32_t transactionID, bool cleared, void** _block)
{
TRACE(("get_writable_cached_block(blockNumber = %lld, transaction = %d)\n",
blockNumber, transactionID));
if (blockNumber < 0 || blockNumber >= cache->max_blocks) {
fssh_panic("get_writable_cached_block: invalid block number %"
FSSH_B_PRIdOFF " (max %" FSSH_B_PRIdOFF ")", blockNumber,
cache->max_blocks - 1);
return FSSH_B_BAD_VALUE;
}
bool allocated;
cached_block* block;
fssh_status_t status = get_cached_block(cache, blockNumber, &allocated,
!cleared, &block);
if (status != FSSH_B_OK)
return status;
block->discard = false;
if (transactionID == -1) {
if (cleared)
fssh_memset(block->current_data, 0, cache->block_size);
block->is_dirty = true;
*_block = block->current_data;
return FSSH_B_OK;
}
cache_transaction* transaction = block->transaction;
if (transaction != NULL && transaction->id != transactionID) {
fssh_panic("get_writable_cached_block(): asked to get busy writable block (transaction %d)\n", (int)transaction->id);
put_cached_block(cache, block);
return FSSH_B_BAD_VALUE;
}
if (transaction == NULL && transactionID != -1) {
transaction = lookup_transaction(cache, transactionID);
if (transaction == NULL) {
fssh_panic("get_writable_cached_block(): invalid transaction %d!\n",
(int)transactionID);
put_cached_block(cache, block);
return FSSH_B_BAD_VALUE;
}
if (!transaction->open) {
fssh_panic("get_writable_cached_block(): transaction already done!\n");
put_cached_block(cache, block);
return FSSH_B_BAD_VALUE;
}
block->transaction = transaction;
block->transaction_next = transaction->first_block;
transaction->first_block = block;
transaction->num_blocks++;
}
bool wasUnchanged = block->original_data == NULL
|| block->previous_transaction != NULL;
if (!(allocated && cleared) && block->original_data == NULL) {
block->original_data = cache->Allocate();
if (block->original_data == NULL) {
FATAL(("could not allocate original_data\n"));
put_cached_block(cache, block);
return FSSH_B_NO_MEMORY;
}
fssh_memcpy(block->original_data, block->current_data, cache->block_size);
}
if (block->parent_data == block->current_data) {
block->parent_data = cache->Allocate();
if (block->parent_data == NULL) {
FATAL(("could not allocate parent\n"));
put_cached_block(cache, block);
return FSSH_B_NO_MEMORY;
}
fssh_memcpy(block->parent_data, block->current_data, cache->block_size);
transaction->sub_num_blocks++;
} else if (transaction != NULL && transaction->has_sub_transaction
&& block->parent_data == NULL && wasUnchanged)
transaction->sub_num_blocks++;
if (cleared)
fssh_memset(block->current_data, 0, cache->block_size);
block->is_dirty = true;
*_block = block->current_data;
return FSSH_B_OK;
}
static fssh_status_t
write_cached_block(block_cache* cache, cached_block* block,
bool deleteTransaction)
{
cache_transaction* previous = block->previous_transaction;
int32_t blockSize = cache->block_size;
void* data = previous && block->original_data
? block->original_data : block->current_data;
TRACE(("write_cached_block(block %lld)\n", block->block_number));
fssh_ssize_t written = fssh_write_pos(cache->fd, block->block_number * blockSize,
data, blockSize);
if (written < blockSize) {
FATAL(("could not write back block %" FSSH_B_PRIdOFF " (%s)\n",
block->block_number, fssh_strerror(fssh_get_errno())));
return FSSH_B_IO_ERROR;
}
if (data == block->current_data)
block->is_dirty = false;
if (previous != NULL) {
previous->blocks.Remove(block);
block->previous_transaction = NULL;
if (block->original_data != NULL && block->transaction == NULL) {
cache->Free(block->original_data);
block->original_data = NULL;
}
if (--previous->num_blocks == 0) {
TRACE(("cache transaction %ld finished!\n", previous->id));
notify_transaction_listeners(cache, previous, FSSH_TRANSACTION_WRITTEN);
if (deleteTransaction) {
hash_remove(cache->transaction_hash, previous);
delete_transaction(cache, previous);
}
}
}
if (block->transaction == NULL && block->ref_count == 0 && !block->unused) {
block->unused = true;
cache->unused_blocks.Add(block);
}
return FSSH_B_OK;
}
static void
wait_for_notifications(block_cache* cache)
{
}
fssh_status_t
block_cache_init()
{
fssh_mutex_init(&sNotificationsLock, "block cache notifications");
return FSSH_B_OK;
}
}
using namespace FSShell;
int32_t
fssh_cache_start_transaction(void* _cache)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
if (cache->last_transaction && cache->last_transaction->open) {
fssh_panic("last transaction (%d) still open!\n",
(int)cache->last_transaction->id);
}
cache_transaction* transaction = new(nothrow) cache_transaction;
if (transaction == NULL)
return FSSH_B_NO_MEMORY;
transaction->id = fssh_atomic_add(&cache->next_transaction_id, 1);
cache->last_transaction = transaction;
TRACE(("cache_start_transaction(): id %d started\n", transaction->id));
hash_insert(cache->transaction_hash, transaction);
return transaction->id;
}
fssh_status_t
fssh_cache_sync_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
fssh_status_t status = FSSH_B_ENTRY_NOT_FOUND;
TRACE(("cache_sync_transaction(id %d)\n", id));
hash_iterator iterator;
hash_open(cache->transaction_hash, &iterator);
cache_transaction* transaction;
while ((transaction = (cache_transaction*)hash_next(
cache->transaction_hash, &iterator)) != NULL) {
if (transaction->id <= id && !transaction->open) {
while (transaction->num_blocks > 0) {
status = write_cached_block(cache, transaction->blocks.Head(),
false);
if (status != FSSH_B_OK)
return status;
}
hash_remove_current(cache->transaction_hash, &iterator);
delete_transaction(cache, transaction);
}
}
hash_close(cache->transaction_hash, &iterator, false);
locker.Unlock();
wait_for_notifications(cache);
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_end_transaction(void* _cache, int32_t id,
fssh_transaction_notification_hook hook, void* data)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("cache_end_transaction(id = %d)\n", id));
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL) {
fssh_panic("cache_end_transaction(): invalid transaction ID\n");
return FSSH_B_BAD_VALUE;
}
notify_transaction_listeners(cache, transaction, FSSH_TRANSACTION_ENDED);
if (add_transaction_listener(cache, transaction, FSSH_TRANSACTION_WRITTEN,
hook, data) != FSSH_B_OK) {
return FSSH_B_NO_MEMORY;
}
cached_block* block = transaction->first_block;
cached_block* next;
for (; block != NULL; block = next) {
next = block->transaction_next;
if (block->previous_transaction != NULL) {
write_cached_block(cache, block);
}
if (block->discard) {
cache->DiscardBlock(block);
transaction->num_blocks--;
continue;
}
if (block->original_data != NULL) {
cache->Free(block->original_data);
block->original_data = NULL;
}
if (transaction->has_sub_transaction) {
if (block->parent_data != block->current_data)
cache->Free(block->parent_data);
block->parent_data = NULL;
}
transaction->blocks.Add(block);
block->previous_transaction = transaction;
block->transaction_next = NULL;
block->transaction = NULL;
}
transaction->open = false;
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_abort_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("cache_abort_transaction(id = %ld)\n", id));
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL) {
fssh_panic("cache_abort_transaction(): invalid transaction ID\n");
return FSSH_B_BAD_VALUE;
}
notify_transaction_listeners(cache, transaction, FSSH_TRANSACTION_ABORTED);
cached_block* block = transaction->first_block;
cached_block* next;
for (; block != NULL; block = next) {
next = block->transaction_next;
if (block->original_data != NULL) {
TRACE(("cache_abort_transaction(id = %ld): restored contents of block %lld\n",
transaction->id, block->block_number));
fssh_memcpy(block->current_data, block->original_data, cache->block_size);
cache->Free(block->original_data);
block->original_data = NULL;
}
if (transaction->has_sub_transaction) {
if (block->parent_data != block->current_data)
cache->Free(block->parent_data);
block->parent_data = NULL;
}
block->transaction_next = NULL;
block->transaction = NULL;
block->discard = false;
}
hash_remove(cache->transaction_hash, transaction);
delete_transaction(cache, transaction);
return FSSH_B_OK;
}
int32_t
fssh_cache_detach_sub_transaction(void* _cache, int32_t id,
fssh_transaction_notification_hook hook, void* data)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("cache_detach_sub_transaction(id = %d)\n", id));
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL) {
fssh_panic("cache_detach_sub_transaction(): invalid transaction ID\n");
return FSSH_B_BAD_VALUE;
}
if (!transaction->has_sub_transaction)
return FSSH_B_BAD_VALUE;
cache_transaction* newTransaction = new(nothrow) cache_transaction;
if (newTransaction == NULL)
return FSSH_B_NO_MEMORY;
newTransaction->id = fssh_atomic_add(&cache->next_transaction_id, 1);
notify_transaction_listeners(cache, transaction, FSSH_TRANSACTION_ENDED);
if (add_transaction_listener(cache, transaction, FSSH_TRANSACTION_WRITTEN,
hook, data) != FSSH_B_OK) {
delete newTransaction;
return FSSH_B_NO_MEMORY;
}
cached_block* block = transaction->first_block;
cached_block* last = NULL;
cached_block* next;
for (; block != NULL; block = next) {
next = block->transaction_next;
if (block->previous_transaction != NULL) {
write_cached_block(cache, block);
}
if (block->discard) {
cache->DiscardBlock(block);
transaction->main_num_blocks--;
continue;
}
if (block->original_data != NULL && block->parent_data != NULL) {
cache->Free(block->original_data);
block->original_data = NULL;
}
if (block->parent_data == NULL
|| block->parent_data != block->current_data) {
block->original_data = block->parent_data;
if (last == NULL)
newTransaction->first_block = block;
else
last->transaction_next = block;
block->transaction = newTransaction;
last = block;
} else
block->transaction = NULL;
if (block->parent_data != NULL) {
transaction->blocks.Add(block);
block->previous_transaction = transaction;
block->parent_data = NULL;
}
block->transaction_next = NULL;
}
newTransaction->num_blocks = transaction->sub_num_blocks;
transaction->open = false;
transaction->has_sub_transaction = false;
transaction->num_blocks = transaction->main_num_blocks;
transaction->sub_num_blocks = 0;
hash_insert(cache->transaction_hash, newTransaction);
cache->last_transaction = newTransaction;
return newTransaction->id;
}
fssh_status_t
fssh_cache_abort_sub_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("cache_abort_sub_transaction(id = %ld)\n", id));
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL) {
fssh_panic("cache_abort_sub_transaction(): invalid transaction ID\n");
return FSSH_B_BAD_VALUE;
}
if (!transaction->has_sub_transaction)
return FSSH_B_BAD_VALUE;
notify_transaction_listeners(cache, transaction, FSSH_TRANSACTION_ABORTED);
cached_block* block = transaction->first_block;
cached_block* next;
for (; block != NULL; block = next) {
next = block->transaction_next;
if (block->parent_data == NULL) {
if (block->original_data != NULL) {
fssh_memcpy(block->current_data, block->original_data,
cache->block_size);
}
} else if (block->parent_data != block->current_data) {
TRACE(("cache_abort_sub_transaction(id = %ld): restored contents of block %lld\n",
transaction->id, block->block_number));
fssh_memcpy(block->current_data, block->parent_data,
cache->block_size);
cache->Free(block->parent_data);
}
block->parent_data = NULL;
block->discard = false;
}
transaction->has_sub_transaction = false;
transaction->sub_num_blocks = 0;
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_start_sub_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("cache_start_sub_transaction(id = %d)\n", id));
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL) {
fssh_panic("cache_start_sub_transaction(): invalid transaction ID %d\n", (int)id);
return FSSH_B_BAD_VALUE;
}
notify_transaction_listeners(cache, transaction, FSSH_TRANSACTION_ENDED);
cached_block* block = transaction->first_block;
cached_block* last = NULL;
cached_block* next;
for (; block != NULL; block = next) {
next = block->transaction_next;
if (block->discard) {
if (last != NULL)
last->transaction_next = next;
else
transaction->first_block = next;
cache->DiscardBlock(block);
transaction->num_blocks--;
continue;
}
if (transaction->has_sub_transaction
&& block->parent_data != NULL
&& block->parent_data != block->current_data) {
cache->Free(block->parent_data);
}
block->parent_data = block->current_data;
last = block;
}
transaction->has_sub_transaction = true;
transaction->main_num_blocks = transaction->num_blocks;
transaction->sub_num_blocks = 0;
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_add_transaction_listener(void* _cache, int32_t id, int32_t events,
fssh_transaction_notification_hook hookFunction, void* data)
{
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_remove_transaction_listener(void* _cache, int32_t id,
fssh_transaction_notification_hook hookFunction, void* data)
{
return FSSH_B_OK;
}
fssh_status_t
fssh_cache_next_block_in_transaction(void* _cache, int32_t id, bool mainOnly,
long* _cookie, fssh_off_t* _blockNumber, void** _data,
void** _unchangedData)
{
cached_block* block = (cached_block*)*_cookie;
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL || !transaction->open)
return FSSH_B_BAD_VALUE;
if (block == NULL)
block = transaction->first_block;
else
block = block->transaction_next;
if (transaction->has_sub_transaction) {
if (mainOnly) {
while (block != NULL && block->parent_data == NULL)
block = block->transaction_next;
} else {
while (block != NULL && block->discard)
block = block->transaction_next;
}
}
if (block == NULL)
return FSSH_B_ENTRY_NOT_FOUND;
if (_blockNumber)
*_blockNumber = block->block_number;
if (_data)
*_data = mainOnly ? block->parent_data : block->current_data;
if (_unchangedData)
*_unchangedData = block->original_data;
*_cookie = (fssh_addr_t)block;
return FSSH_B_OK;
}
int32_t
fssh_cache_blocks_in_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL)
return FSSH_B_BAD_VALUE;
return transaction->num_blocks;
}
int32_t
fssh_cache_blocks_in_main_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL)
return FSSH_B_BAD_VALUE;
return transaction->main_num_blocks;
}
int32_t
fssh_cache_blocks_in_sub_transaction(void* _cache, int32_t id)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cache_transaction* transaction = lookup_transaction(cache, id);
if (transaction == NULL)
return FSSH_B_BAD_VALUE;
return transaction->sub_num_blocks;
}
bool
fssh_cache_has_block_in_transaction(void* _cache, int32_t id,
fssh_off_t blockNumber)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cached_block* block = (cached_block*)hash_lookup(cache->hash, &blockNumber);
return (block != NULL && block->transaction != NULL
&& block->transaction->id == id);
}
void
fssh_block_cache_delete(void* _cache, bool allowWrites)
{
block_cache* cache = (block_cache*)_cache;
if (allowWrites)
fssh_block_cache_sync(cache);
fssh_mutex_lock(&cache->lock);
uint32_t cookie = 0;
cached_block* block;
while ((block = (cached_block*)hash_remove_first(cache->hash,
&cookie)) != NULL) {
cache->FreeBlock(block);
}
cookie = 0;
cache_transaction* transaction;
while ((transaction = (cache_transaction*)hash_remove_first(
cache->transaction_hash, &cookie)) != NULL) {
delete transaction;
}
delete cache;
}
void*
fssh_block_cache_create(int fd, fssh_off_t numBlocks, fssh_size_t blockSize, bool readOnly)
{
block_cache* cache = new(std::nothrow) block_cache(fd, numBlocks, blockSize,
readOnly);
if (cache == NULL)
return NULL;
if (cache->Init() != FSSH_B_OK) {
delete cache;
return NULL;
}
return cache;
}
fssh_status_t
fssh_block_cache_sync(void* _cache)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
hash_iterator iterator;
hash_open(cache->hash, &iterator);
cached_block* block;
while ((block = (cached_block*)hash_next(cache->hash, &iterator)) != NULL) {
if (block->previous_transaction != NULL
|| (block->transaction == NULL && block->is_dirty)) {
fssh_status_t status = write_cached_block(cache, block);
if (status != FSSH_B_OK)
return status;
}
}
hash_close(cache->hash, &iterator, false);
return FSSH_B_OK;
}
fssh_status_t
fssh_block_cache_sync_etc(void* _cache, fssh_off_t blockNumber,
fssh_size_t numBlocks)
{
block_cache* cache = (block_cache*)_cache;
if (blockNumber < 0 || blockNumber >= cache->max_blocks) {
fssh_panic("block_cache_sync_etc: invalid block number %" FSSH_B_PRIdOFF
" (max %" FSSH_B_PRIdOFF ")", blockNumber, cache->max_blocks - 1);
return FSSH_B_BAD_VALUE;
}
MutexLocker locker(&cache->lock);
for (; numBlocks > 0; numBlocks--, blockNumber++) {
cached_block* block = (cached_block*)hash_lookup(cache->hash,
&blockNumber);
if (block == NULL)
continue;
if (block->previous_transaction != NULL
|| (block->transaction == NULL && block->is_dirty)) {
fssh_status_t status = write_cached_block(cache, block);
if (status != FSSH_B_OK)
return status;
}
}
return FSSH_B_OK;
}
void
fssh_block_cache_discard(void* _cache, fssh_off_t blockNumber,
fssh_size_t numBlocks)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
for (; numBlocks > 0; numBlocks--, blockNumber++) {
cached_block* block = (cached_block*)hash_lookup(cache->hash,
&blockNumber);
if (block == NULL)
continue;
if (block->previous_transaction != NULL)
write_cached_block(cache, block);
if (block->unused) {
cache->unused_blocks.Remove(block);
cache->RemoveBlock(block);
} else {
if (block->transaction != NULL && block->parent_data != NULL
&& block->parent_data != block->current_data) {
fssh_panic("Discarded block %" FSSH_B_PRIdOFF " has already "
"been changed in this transaction!", blockNumber);
}
block->discard = true;
}
}
}
fssh_status_t
fssh_block_cache_make_writable(void* _cache, fssh_off_t blockNumber,
int32_t transaction)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
if (cache->read_only)
fssh_panic("tried to make block writable on a read-only cache!");
void* block;
fssh_status_t status = get_writable_cached_block(cache, blockNumber,
transaction, false, &block);
if (status == FSSH_B_OK) {
put_cached_block((block_cache*)_cache, blockNumber);
return FSSH_B_OK;
}
return status;
}
fssh_status_t
fssh_block_cache_get_writable_etc(void* _cache, fssh_off_t blockNumber,
int32_t transaction, void** _block)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("block_cache_get_writable_etc(block = %lld, transaction = %ld)\n",
blockNumber, transaction));
if (cache->read_only)
fssh_panic("tried to get writable block on a read-only cache!");
return get_writable_cached_block(cache, blockNumber,
transaction, false, _block);
}
void*
fssh_block_cache_get_writable(void* _cache, fssh_off_t blockNumber,
int32_t transaction)
{
void* block;
fssh_status_t status = fssh_block_cache_get_writable_etc(_cache,
blockNumber, transaction, &block);
if (status == FSSH_B_OK)
return block;
return NULL;
}
void*
fssh_block_cache_get_empty(void* _cache, fssh_off_t blockNumber,
int32_t transaction)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
TRACE(("block_cache_get_empty(block = %lld, transaction = %ld)\n",
blockNumber, transaction));
if (cache->read_only)
fssh_panic("tried to get empty writable block on a read-only cache!");
void* block;
if (get_writable_cached_block((block_cache*)_cache, blockNumber,
transaction, true, &block) == FSSH_B_OK)
return block;
return NULL;
}
fssh_status_t
fssh_block_cache_get_etc(void* _cache, fssh_off_t blockNumber, const void** _block)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
bool allocated;
cached_block* block;
fssh_status_t status = get_cached_block(cache, blockNumber, &allocated,
true, &block);
if (status != FSSH_B_OK)
return status;
#ifdef DEBUG_CHANGED
if (block->compare == NULL)
block->compare = cache->Allocate();
if (block->compare != NULL)
memcpy(block->compare, block->current_data, cache->block_size);
#endif
*_block = block->current_data;
return FSSH_B_OK;
}
const void*
fssh_block_cache_get(void* _cache, fssh_off_t blockNumber)
{
const void* block;
if (fssh_block_cache_get_etc(_cache, blockNumber, &block)
== FSSH_B_OK)
return block;
return NULL;
}
fssh_status_t
fssh_block_cache_set_dirty(void* _cache, fssh_off_t blockNumber, bool dirty,
int32_t transaction)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
cached_block* block = (cached_block*)hash_lookup(cache->hash,
&blockNumber);
if (block == NULL)
return FSSH_B_BAD_VALUE;
if (block->is_dirty == dirty) {
return FSSH_B_OK;
}
if (dirty)
fssh_panic("block_cache_set_dirty(): not yet implemented that way!\n");
return FSSH_B_OK;
}
void
fssh_block_cache_put(void* _cache, fssh_off_t blockNumber)
{
block_cache* cache = (block_cache*)_cache;
MutexLocker locker(&cache->lock);
put_cached_block(cache, blockNumber);
}
fssh_status_t
fssh_block_cache_prefetch(void* _cache, fssh_off_t blockNumber, fssh_size_t* _numBlocks)
{
*_numBlocks = 0;
return FSSH_B_UNSUPPORTED;
}