#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <isc/event.h>
#include <isc/task.h>
#include <isc/util.h>
#include "task_p.h"
typedef enum {
task_state_idle, task_state_ready, task_state_running,
task_state_done
} task_state_t;
struct isc_task {
isc_taskmgr_t * manager;
task_state_t state;
unsigned int references;
isc_eventlist_t events;
isc_eventlist_t on_shutdown;
unsigned int nevents;
unsigned int quantum;
unsigned int flags;
time_t now;
char name[16];
void * tag;
LINK(isc_task_t) link;
LINK(isc_task_t) ready_link;
LINK(isc_task_t) ready_priority_link;
};
#define TASK_F_SHUTTINGDOWN 0x01
#define TASK_F_PRIVILEGED 0x02
#define TASK_SHUTTINGDOWN(t) (((t)->flags & TASK_F_SHUTTINGDOWN) \
!= 0)
typedef ISC_LIST(isc_task_t) isc_tasklist_t;
struct isc_taskmgr {
unsigned int default_quantum;
LIST(isc_task_t) tasks;
isc_tasklist_t ready_tasks;
isc_tasklist_t ready_priority_tasks;
isc_taskmgrmode_t mode;
unsigned int tasks_running;
unsigned int tasks_ready;
int pause_requested;
int exclusive_requested;
int exiting;
isc_task_t *excl;
unsigned int refs;
};
#define DEFAULT_TASKMGR_QUANTUM 10
#define DEFAULT_DEFAULT_QUANTUM 5
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
static isc_taskmgr_t *taskmgr = NULL;
static inline int
empty_readyq(isc_taskmgr_t *manager);
static inline isc_task_t *
pop_readyq(isc_taskmgr_t *manager);
static inline void
push_readyq(isc_taskmgr_t *manager, isc_task_t *task);
static void
task_finished(isc_task_t *task) {
isc_taskmgr_t *manager = task->manager;
REQUIRE(EMPTY(task->events));
REQUIRE(task->nevents == 0);
REQUIRE(EMPTY(task->on_shutdown));
REQUIRE(task->references == 0);
REQUIRE(task->state == task_state_done);
UNLINK(manager->tasks, task, link);
free(task);
}
isc_result_t
isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
isc_task_t **taskp)
{
isc_task_t *task;
int exiting;
REQUIRE(taskp != NULL && *taskp == NULL);
task = malloc(sizeof(*task));
if (task == NULL)
return (ISC_R_NOMEMORY);
task->manager = manager;
task->state = task_state_idle;
task->references = 1;
INIT_LIST(task->events);
INIT_LIST(task->on_shutdown);
task->nevents = 0;
task->quantum = quantum;
task->flags = 0;
task->now = 0;
memset(task->name, 0, sizeof(task->name));
task->tag = NULL;
INIT_LINK(task, link);
INIT_LINK(task, ready_link);
INIT_LINK(task, ready_priority_link);
exiting = 0;
if (!manager->exiting) {
if (task->quantum == 0)
task->quantum = manager->default_quantum;
APPEND(manager->tasks, task, link);
} else
exiting = 1;
if (exiting) {
free(task);
return (ISC_R_SHUTTINGDOWN);
}
*taskp = (isc_task_t *)task;
return (ISC_R_SUCCESS);
}
void
isc_task_attach(isc_task_t *source0, isc_task_t **targetp) {
isc_task_t *source = (isc_task_t *)source0;
REQUIRE(targetp != NULL && *targetp == NULL);
source->references++;
*targetp = (isc_task_t *)source;
}
static inline int
task_shutdown(isc_task_t *task) {
int was_idle = 0;
isc_event_t *event, *prev;
if (! TASK_SHUTTINGDOWN(task)) {
task->flags |= TASK_F_SHUTTINGDOWN;
if (task->state == task_state_idle) {
INSIST(EMPTY(task->events));
task->state = task_state_ready;
was_idle = 1;
}
INSIST(task->state == task_state_ready ||
task->state == task_state_running);
for (event = TAIL(task->on_shutdown);
event != NULL;
event = prev) {
prev = PREV(event, ev_link);
DEQUEUE(task->on_shutdown, event, ev_link);
ENQUEUE(task->events, event, ev_link);
task->nevents++;
}
}
return (was_idle);
}
static inline void
task_ready(isc_task_t *task) {
isc_taskmgr_t *manager = task->manager;
REQUIRE(task->state == task_state_ready);
push_readyq(manager, task);
}
static inline int
task_detach(isc_task_t *task) {
REQUIRE(task->references > 0);
task->references--;
if (task->references == 0 && task->state == task_state_idle) {
INSIST(EMPTY(task->events));
task->state = task_state_ready;
return (1);
}
return (0);
}
void
isc_task_detach(isc_task_t **taskp) {
isc_task_t *task;
int was_idle;
REQUIRE(taskp != NULL);
task = (isc_task_t *)*taskp;
was_idle = task_detach(task);
if (was_idle)
task_ready(task);
*taskp = NULL;
}
static inline int
task_send(isc_task_t *task, isc_event_t **eventp) {
int was_idle = 0;
isc_event_t *event;
REQUIRE(eventp != NULL);
event = *eventp;
REQUIRE(event != NULL);
REQUIRE(event->ev_type > 0);
REQUIRE(task->state != task_state_done);
REQUIRE(!ISC_LINK_LINKED(event, ev_ratelink));
if (task->state == task_state_idle) {
was_idle = 1;
INSIST(EMPTY(task->events));
task->state = task_state_ready;
}
INSIST(task->state == task_state_ready ||
task->state == task_state_running);
ENQUEUE(task->events, event, ev_link);
task->nevents++;
*eventp = NULL;
return (was_idle);
}
void
isc_task_send(isc_task_t *task, isc_event_t **eventp) {
int was_idle;
was_idle = task_send(task, eventp);
if (was_idle) {
task_ready(task);
}
}
void
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
int idle1, idle2;
isc_task_t *task;
REQUIRE(taskp != NULL);
task = (isc_task_t *)*taskp;
idle1 = task_send(task, eventp);
idle2 = task_detach(task);
INSIST(!(idle1 && idle2));
if (idle1 || idle2)
task_ready(task);
*taskp = NULL;
}
#define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0)
static unsigned int
dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first,
isc_eventtype_t last, void *tag,
isc_eventlist_t *events, int purging)
{
isc_event_t *event, *next_event;
unsigned int count = 0;
REQUIRE(last >= first);
for (event = HEAD(task->events); event != NULL; event = next_event) {
next_event = NEXT(event, ev_link);
if (event->ev_type >= first && event->ev_type <= last &&
(sender == NULL || event->ev_sender == sender) &&
(tag == NULL || event->ev_tag == tag) &&
(!purging || PURGE_OK(event))) {
DEQUEUE(task->events, event, ev_link);
task->nevents--;
ENQUEUE(*events, event, ev_link);
count++;
}
}
return (count);
}
unsigned int
isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first,
isc_eventtype_t last, void *tag)
{
unsigned int count;
isc_eventlist_t events;
isc_event_t *event, *next_event;
ISC_LIST_INIT(events);
count = dequeue_events(task, sender, first, last, tag, &events,
1);
for (event = HEAD(events); event != NULL; event = next_event) {
next_event = NEXT(event, ev_link);
ISC_LIST_UNLINK(events, event, ev_link);
isc_event_free(&event);
}
return (count);
}
void
isc_task_setname(isc_task_t *task, const char *name, void *tag) {
strlcpy(task->name, name, sizeof(task->name));
task->tag = tag;
}
static inline int
empty_readyq(isc_taskmgr_t *manager) {
isc_tasklist_t queue;
if (manager->mode == isc_taskmgrmode_normal)
queue = manager->ready_tasks;
else
queue = manager->ready_priority_tasks;
return (EMPTY(queue));
}
static inline isc_task_t *
pop_readyq(isc_taskmgr_t *manager) {
isc_task_t *task;
if (manager->mode == isc_taskmgrmode_normal)
task = HEAD(manager->ready_tasks);
else
task = HEAD(manager->ready_priority_tasks);
if (task != NULL) {
DEQUEUE(manager->ready_tasks, task, ready_link);
if (ISC_LINK_LINKED(task, ready_priority_link))
DEQUEUE(manager->ready_priority_tasks, task,
ready_priority_link);
}
return (task);
}
static inline void
push_readyq(isc_taskmgr_t *manager, isc_task_t *task) {
ENQUEUE(manager->ready_tasks, task, ready_link);
if ((task->flags & TASK_F_PRIVILEGED) != 0)
ENQUEUE(manager->ready_priority_tasks, task,
ready_priority_link);
manager->tasks_ready++;
}
static void
dispatch(isc_taskmgr_t *manager) {
isc_task_t *task;
unsigned int total_dispatch_count = 0;
isc_tasklist_t new_ready_tasks;
isc_tasklist_t new_priority_tasks;
unsigned int tasks_ready = 0;
ISC_LIST_INIT(new_ready_tasks);
ISC_LIST_INIT(new_priority_tasks);
while (!FINISHED(manager)) {
if (total_dispatch_count >= DEFAULT_TASKMGR_QUANTUM ||
empty_readyq(manager))
break;
task = pop_readyq(manager);
if (task != NULL) {
unsigned int dispatch_count = 0;
int done = 0;
int requeue = 0;
int finished = 0;
isc_event_t *event;
manager->tasks_ready--;
manager->tasks_running++;
INSIST(task->state == task_state_ready);
task->state = task_state_running;
time(&task->now);
do {
if (!EMPTY(task->events)) {
event = HEAD(task->events);
DEQUEUE(task->events, event, ev_link);
task->nevents--;
if (event->ev_action != NULL) {
(event->ev_action)(
(isc_task_t *)task,
event);
}
dispatch_count++;
total_dispatch_count++;
}
if (task->references == 0 &&
EMPTY(task->events) &&
!TASK_SHUTTINGDOWN(task)) {
int was_idle;
was_idle = task_shutdown(task);
INSIST(!was_idle);
}
if (EMPTY(task->events)) {
if (task->references == 0 &&
TASK_SHUTTINGDOWN(task)) {
finished = 1;
task->state = task_state_done;
} else
task->state = task_state_idle;
done = 1;
} else if (dispatch_count >= task->quantum) {
task->state = task_state_ready;
requeue = 1;
done = 1;
}
} while (!done);
if (finished)
task_finished(task);
manager->tasks_running--;
if (requeue) {
ENQUEUE(new_ready_tasks, task, ready_link);
if ((task->flags & TASK_F_PRIVILEGED) != 0)
ENQUEUE(new_priority_tasks, task,
ready_priority_link);
tasks_ready++;
}
}
}
ISC_LIST_APPENDLIST(manager->ready_tasks, new_ready_tasks, ready_link);
ISC_LIST_APPENDLIST(manager->ready_priority_tasks, new_priority_tasks,
ready_priority_link);
manager->tasks_ready += tasks_ready;
if (empty_readyq(manager))
manager->mode = isc_taskmgrmode_normal;
}
static void
manager_free(isc_taskmgr_t *manager) {
free(manager);
taskmgr = NULL;
}
isc_result_t
isc_taskmgr_create(unsigned int workers,
unsigned int default_quantum, isc_taskmgr_t **managerp)
{
unsigned int i, started = 0;
isc_taskmgr_t *manager;
REQUIRE(workers > 0);
REQUIRE(managerp != NULL && *managerp == NULL);
UNUSED(i);
UNUSED(started);
if (taskmgr != NULL) {
if (taskmgr->refs == 0)
return (ISC_R_SHUTTINGDOWN);
taskmgr->refs++;
*managerp = (isc_taskmgr_t *)taskmgr;
return (ISC_R_SUCCESS);
}
manager = malloc(sizeof(*manager));
if (manager == NULL)
return (ISC_R_NOMEMORY);
manager->mode = isc_taskmgrmode_normal;
if (default_quantum == 0)
default_quantum = DEFAULT_DEFAULT_QUANTUM;
manager->default_quantum = default_quantum;
INIT_LIST(manager->tasks);
INIT_LIST(manager->ready_tasks);
INIT_LIST(manager->ready_priority_tasks);
manager->tasks_running = 0;
manager->tasks_ready = 0;
manager->exclusive_requested = 0;
manager->pause_requested = 0;
manager->exiting = 0;
manager->excl = NULL;
manager->refs = 1;
taskmgr = manager;
*managerp = (isc_taskmgr_t *)manager;
return (ISC_R_SUCCESS);
}
void
isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
isc_taskmgr_t *manager;
isc_task_t *task;
unsigned int i;
REQUIRE(managerp != NULL);
manager = (isc_taskmgr_t *)*managerp;
UNUSED(i);
manager->refs--;
if (manager->refs > 0) {
*managerp = NULL;
return;
}
if (manager->excl != NULL)
isc_task_detach((isc_task_t **) &manager->excl);
INSIST(!manager->exiting);
manager->exiting = 1;
manager->mode = isc_taskmgrmode_normal;
for (task = HEAD(manager->tasks);
task != NULL;
task = NEXT(task, link)) {
if (task_shutdown(task))
push_readyq(manager, task);
}
while (isc_taskmgr_ready((isc_taskmgr_t *)manager))
(void)isc_taskmgr_dispatch((isc_taskmgr_t *)manager);
INSIST(ISC_LIST_EMPTY(manager->tasks));
taskmgr = NULL;
manager_free(manager);
*managerp = NULL;
}
int
isc_taskmgr_ready(isc_taskmgr_t *manager) {
int is_ready;
if (manager == NULL)
manager = taskmgr;
if (manager == NULL)
return (0);
is_ready = !empty_readyq(manager);
return (is_ready);
}
isc_result_t
isc_taskmgr_dispatch(isc_taskmgr_t *manager) {
if (manager == NULL)
manager = taskmgr;
if (manager == NULL)
return (ISC_R_NOTFOUND);
dispatch(manager);
return (ISC_R_SUCCESS);
}