#include "streams_mmap.h"
#include "streams_common.h"
static int
stream_mmap_prime(stream_t *str)
{
char *nl;
if (stream_is_primed(str))
return (PRIME_SUCCEEDED);
stream_set(str, STREAM_PRIMED);
if (str->s_buffer_size == 0) {
stream_set(str, STREAM_EOS_REACHED);
return (PRIME_FAILED_EMPTY_FILE);
}
str->s_current.l_data.sp = str->s_buffer;
str->s_type.SF.s_release_origin = str->s_buffer;
if ((nl = (char *)memchr(str->s_buffer, '\n', str->s_buffer_size)) ==
NULL) {
warn(WMSG_NEWLINE_ADDED, str->s_filename);
str->s_current.l_data_length = str->s_buffer_size;
} else {
str->s_current.l_data_length = nl - (char *)str->s_buffer;
}
str->s_current.l_collate.sp = NULL;
str->s_current.l_collate_length = 0;
__S(stats_incr_fetches());
return (PRIME_SUCCEEDED);
}
static ssize_t
stream_mmap_fetch(stream_t *str)
{
ssize_t dist_to_buf_end;
char *next_nl;
ASSERT(stream_is_primed(str));
ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
str->s_current.l_data.sp = str->s_current.l_data.sp +
str->s_current.l_data_length + 1;
dist_to_buf_end = str->s_buffer_size - (str->s_current.l_data.sp
- (char *)str->s_buffer);
ASSERT(dist_to_buf_end >= 0 && dist_to_buf_end <= str->s_buffer_size);
next_nl = memchr(str->s_current.l_data.sp, '\n', dist_to_buf_end);
if (next_nl)
str->s_current.l_data_length = next_nl
- str->s_current.l_data.sp;
else {
warn(WMSG_NEWLINE_ADDED, str->s_filename);
str->s_current.l_data_length = dist_to_buf_end;
}
if (str->s_current.l_data.sp + str->s_current.l_data_length + 1 >=
(char *)str->s_buffer + str->s_buffer_size)
stream_set(str, STREAM_EOS_REACHED);
str->s_current.l_collate_length = 0;
__S(stats_incr_fetches());
return (NEXT_LINE_COMPLETE);
}
static int
stream_mmap_is_closable(stream_t *str)
{
if (str->s_status & STREAM_OPEN)
return (1);
return (0);
}
static int
stream_mmap_close(stream_t *str)
{
if (str->s_type.SF.s_fd > -1) {
(void) close(str->s_type.SF.s_fd);
stream_unset(str, STREAM_OPEN);
return (1);
}
return (0);
}
static int
stream_mmap_free(stream_t *str)
{
if (!(str->s_status & STREAM_OPEN) ||
(str->s_consumer != NULL &&
str->s_consumer->s_status & STREAM_NOT_FREEABLE))
return (0);
if (str->s_buffer == NULL)
return (1);
if (munmap(str->s_buffer, str->s_buffer_size) < 0)
die(EMSG_MUNMAP, str->s_filename);
str->s_buffer = NULL;
str->s_buffer_size = 0;
stream_unset(str, STREAM_PRIMED);
return (1);
}
static int
stream_mmap_eos(stream_t *str)
{
int retval = 0;
if (str == NULL || str->s_status & STREAM_EOS_REACHED)
return (1);
if (str->s_filesize == 0 ||
(stream_is_primed(str) && str->s_current.l_data.sp -
(char *)str->s_buffer + str->s_current.l_data_length + 1 >=
str->s_buffer_size)) {
retval = 1;
stream_set(str, STREAM_EOS_REACHED);
}
return (retval);
}
#define ALIGNED (~(ulong_t)(PAGESIZE - 1))
static void
stream_mmap_release_line(stream_t *str)
{
caddr_t origin = str->s_type.SF.s_release_origin;
size_t release = 0;
while ((caddr_t)((ulong_t)str->s_current.l_data.sp & ALIGNED) -
(origin + release) >= DEFAULT_RELEASE_SIZE)
release += DEFAULT_RELEASE_SIZE;
if (release == 0)
return;
if (madvise(origin, release, MADV_DONTNEED) == -1)
warn(gettext("madvise failed"));
str->s_type.SF.s_release_origin += release;
}
const stream_ops_t stream_mmap_ops = {
stream_mmap_is_closable,
stream_mmap_close,
stream_mmap_eos,
stream_mmap_fetch,
NULL,
stream_mmap_free,
NULL,
stream_mmap_prime,
NULL,
stream_mmap_release_line,
NULL,
stream_stdio_unlink
};