#include "config.h"
#include "dnstap/dtstream.h"
#include "dnstap/dnstap_fstrm.h"
#include "util/config_file.h"
#include "util/ub_event.h"
#include "util/net_help.h"
#include "services/outside_network.h"
#include "sldns/sbuffer.h"
#ifdef HAVE_SYS_UN_H
#include <sys/un.h>
#endif
#include <fcntl.h>
#ifdef HAVE_OPENSSL_SSL_H
#include <openssl/ssl.h>
#endif
#ifdef HAVE_OPENSSL_ERR_H
#include <openssl/err.h>
#endif
#define DTIO_MESSAGES_PER_CALLBACK 100
#define DTIO_RECONNECT_TIMEOUT_MIN 10
#define DTIO_RECONNECT_TIMEOUT_MAX 1000
#define DTIO_RECONNECT_TIMEOUT_SLOW 1000
#define DTIO_MSG_FOR_WAKEUP 32
#define DTIO_RECV_FRAME_MAX_LEN 1000
struct stop_flush_info;
enum {
DTIO_COMMAND_STOP = 0,
DTIO_COMMAND_WAKEUP = 1
} dtio_channel_command;
static void dtio_open_output(struct dt_io_thread* dtio);
static int dtio_add_output_event_write(struct dt_io_thread* dtio);
static void dtio_reconnect_enable(struct dt_io_thread* dtio);
static void dtio_stop_flush_exit(struct stop_flush_info* info);
static int dtio_control_start_send(struct dt_io_thread* dtio);
#ifdef HAVE_SSL
static int dtio_enable_brief_read(struct dt_io_thread* dtio);
static int dtio_enable_brief_write(struct dt_io_thread* dtio);
#endif
struct dt_msg_queue*
dt_msg_queue_create(struct comm_base* base)
{
struct dt_msg_queue* mq = calloc(1, sizeof(*mq));
if(!mq) return NULL;
mq->maxsize = 1*1024*1024;
mq->wakeup_timer = comm_timer_create(base, mq_wakeup_cb, mq);
if(!mq->wakeup_timer) {
free(mq);
return NULL;
}
lock_basic_init(&mq->lock);
lock_protect(&mq->lock, mq, sizeof(*mq));
return mq;
}
static void
dt_msg_queue_clear(struct dt_msg_queue* mq)
{
struct dt_msg_entry* e = mq->first, *next=NULL;
while(e) {
next = e->next;
free(e->buf);
free(e);
e = next;
}
mq->first = NULL;
mq->last = NULL;
mq->cursize = 0;
mq->msgcount = 0;
}
void
dt_msg_queue_delete(struct dt_msg_queue* mq)
{
if(!mq) return;
lock_basic_destroy(&mq->lock);
dt_msg_queue_clear(mq);
comm_timer_delete(mq->wakeup_timer);
free(mq);
}
static void dtio_wakeup(struct dt_io_thread* dtio)
{
uint8_t cmd = DTIO_COMMAND_WAKEUP;
if(!dtio) return;
if(!dtio->started) return;
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
continue;
if(WSAGetLastError() == WSAEWOULDBLOCK)
continue;
#endif
log_err("dnstap io wakeup: write: %s",
sock_strerror(errno));
break;
}
break;
}
}
void
mq_wakeup_cb(void* arg)
{
struct dt_msg_queue* mq = (struct dt_msg_queue*)arg;
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
mq->dtio->wakeup_timer_enabled = 0;
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
dtio_wakeup(mq->dtio);
}
static void
dt_msg_queue_start_timer(struct dt_msg_queue* mq, int wakeupnow)
{
struct timeval tv = {0};
lock_basic_lock(&mq->dtio->wakeup_timer_lock);
if(mq->dtio->wakeup_timer_enabled) {
if(wakeupnow) {
tv.tv_sec = 0;
tv.tv_usec = 0;
comm_timer_set(mq->wakeup_timer, &tv);
}
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
return;
}
mq->dtio->wakeup_timer_enabled = 1;
if(!wakeupnow) {
tv.tv_sec = 1;
tv.tv_usec = 0;
if(!comm_timer_is_set(mq->wakeup_timer))
comm_timer_set(mq->wakeup_timer, &tv);
} else {
tv.tv_sec = 0;
tv.tv_usec = 0;
comm_timer_set(mq->wakeup_timer, &tv);
}
lock_basic_unlock(&mq->dtio->wakeup_timer_lock);
}
void
dt_msg_queue_submit(struct dt_msg_queue* mq, void* buf, size_t len)
{
int wakeupnow = 0, wakeupstarttimer = 0;
struct dt_msg_entry* entry;
if(!buf) return;
if(len == 0) {
free(buf);
return;
}
if(!mq) {
free(buf);
return;
}
entry = malloc(sizeof(*entry));
if(!entry) {
log_err("out of memory logging dnstap");
free(buf);
return;
}
entry->next = NULL;
entry->buf = buf;
entry->len = len;
lock_basic_lock(&mq->lock);
if(mq->first == NULL || !mq->dtio->event_added_is_write)
wakeupstarttimer = 1;
if(mq->msgcount == DTIO_MSG_FOR_WAKEUP ||
(mq->cursize < mq->maxsize * 9 / 10 &&
mq->cursize+len >= mq->maxsize * 9 / 10))
wakeupnow = 1;
if(mq->cursize + len > mq->maxsize) {
lock_basic_unlock(&mq->lock);
free(buf);
free(entry);
return;
}
mq->cursize += len;
mq->msgcount ++;
if(mq->last) {
mq->last->next = entry;
} else {
mq->first = entry;
}
mq->last = entry;
lock_basic_unlock(&mq->lock);
if(wakeupnow || wakeupstarttimer) {
dt_msg_queue_start_timer(mq, wakeupnow);
}
}
struct dt_io_thread* dt_io_thread_create(void)
{
struct dt_io_thread* dtio = calloc(1, sizeof(*dtio));
lock_basic_init(&dtio->wakeup_timer_lock);
lock_protect(&dtio->wakeup_timer_lock, &dtio->wakeup_timer_enabled,
sizeof(dtio->wakeup_timer_enabled));
return dtio;
}
void dt_io_thread_delete(struct dt_io_thread* dtio)
{
struct dt_io_list_item* item, *nextitem;
if(!dtio) return;
lock_basic_destroy(&dtio->wakeup_timer_lock);
item=dtio->io_list;
while(item) {
nextitem = item->next;
free(item);
item = nextitem;
}
free(dtio->socket_path);
free(dtio->ip_str);
free(dtio->tls_server_name);
free(dtio->client_key_file);
free(dtio->client_cert_file);
if(dtio->ssl_ctx) {
#ifdef HAVE_SSL
SSL_CTX_free(dtio->ssl_ctx);
#endif
}
free(dtio);
}
int dt_io_thread_apply_cfg(struct dt_io_thread* dtio, struct config_file *cfg)
{
if(!cfg->dnstap) {
log_warn("cannot setup dnstap because dnstap-enable is no");
return 0;
}
if(cfg->dnstap_ip && cfg->dnstap_ip[0]) {
if(cfg->dnstap_tls)
dtio->upstream_is_tls = 1;
else dtio->upstream_is_tcp = 1;
} else {
dtio->upstream_is_unix = 1;
}
dtio->is_bidirectional = cfg->dnstap_bidirectional;
if(dtio->upstream_is_unix) {
char* nm;
if(!cfg->dnstap_socket_path ||
cfg->dnstap_socket_path[0]==0) {
log_err("dnstap setup: no dnstap-socket-path for "
"socket connect");
return 0;
}
nm = cfg->dnstap_socket_path;
if(cfg->chrootdir && cfg->chrootdir[0] && strncmp(nm,
cfg->chrootdir, strlen(cfg->chrootdir)) == 0)
nm += strlen(cfg->chrootdir);
free(dtio->socket_path);
dtio->socket_path = strdup(nm);
if(!dtio->socket_path) {
log_err("dnstap setup: malloc failure");
return 0;
}
}
if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
if(!cfg->dnstap_ip || cfg->dnstap_ip[0] == 0) {
log_err("dnstap setup: no dnstap-ip for TCP connect");
return 0;
}
free(dtio->ip_str);
dtio->ip_str = strdup(cfg->dnstap_ip);
if(!dtio->ip_str) {
log_err("dnstap setup: malloc failure");
return 0;
}
}
if(dtio->upstream_is_tls) {
#ifdef HAVE_SSL
if(cfg->dnstap_tls_server_name &&
cfg->dnstap_tls_server_name[0]) {
free(dtio->tls_server_name);
dtio->tls_server_name = strdup(
cfg->dnstap_tls_server_name);
if(!dtio->tls_server_name) {
log_err("dnstap setup: malloc failure");
return 0;
}
if(!check_auth_name_for_ssl(dtio->tls_server_name))
return 0;
}
if(cfg->dnstap_tls_client_key_file &&
cfg->dnstap_tls_client_key_file[0]) {
dtio->use_client_certs = 1;
free(dtio->client_key_file);
dtio->client_key_file = strdup(
cfg->dnstap_tls_client_key_file);
if(!dtio->client_key_file) {
log_err("dnstap setup: malloc failure");
return 0;
}
if(!cfg->dnstap_tls_client_cert_file ||
cfg->dnstap_tls_client_cert_file[0]==0) {
log_err("dnstap setup: client key "
"authentication enabled with "
"dnstap-tls-client-key-file, but "
"no dnstap-tls-client-cert-file "
"is given");
return 0;
}
free(dtio->client_cert_file);
dtio->client_cert_file = strdup(
cfg->dnstap_tls_client_cert_file);
if(!dtio->client_cert_file) {
log_err("dnstap setup: malloc failure");
return 0;
}
} else {
dtio->use_client_certs = 0;
dtio->client_key_file = NULL;
dtio->client_cert_file = NULL;
}
if(cfg->dnstap_tls_cert_bundle) {
dtio->ssl_ctx = connect_sslctx_create(
dtio->client_key_file,
dtio->client_cert_file,
cfg->dnstap_tls_cert_bundle, 0);
} else {
dtio->ssl_ctx = connect_sslctx_create(
dtio->client_key_file,
dtio->client_cert_file,
cfg->tls_cert_bundle, cfg->tls_win_cert);
}
if(!dtio->ssl_ctx) {
log_err("could not setup SSL CTX");
return 0;
}
dtio->tls_use_sni = cfg->tls_use_sni;
#endif
}
return 1;
}
int dt_io_thread_register_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item = malloc(sizeof(*item));
if(!item) return 0;
lock_basic_lock(&mq->lock);
mq->dtio = dtio;
lock_basic_unlock(&mq->lock);
item->queue = mq;
item->next = dtio->io_list;
dtio->io_list = item;
dtio->io_list_iter = NULL;
return 1;
}
void dt_io_thread_unregister_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
struct dt_io_list_item* item, *prev=NULL;
if(!dtio) return;
item = dtio->io_list;
while(item) {
if(item->queue == mq) {
if(prev) prev->next = item->next;
else dtio->io_list = item->next;
lock_basic_lock(&item->queue->lock);
item->queue->dtio = NULL;
lock_basic_unlock(&item->queue->lock);
free(item);
dtio->io_list_iter = NULL;
return;
}
prev = item;
item = item->next;
}
}
static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
size_t* len)
{
lock_basic_lock(&mq->lock);
if(mq->first) {
struct dt_msg_entry* entry = mq->first;
mq->first = entry->next;
if(!entry->next) mq->last = NULL;
mq->cursize -= entry->len;
mq->msgcount --;
lock_basic_unlock(&mq->lock);
*buf = entry->buf;
*len = entry->len;
free(entry);
return 1;
}
lock_basic_unlock(&mq->lock);
return 0;
}
static int dtio_find_in_queue(struct dt_io_thread* dtio,
struct dt_msg_queue* mq)
{
void* buf=NULL;
size_t len=0;
if(dt_msg_queue_pop(mq, &buf, &len)) {
dtio->cur_msg = buf;
dtio->cur_msg_len = len;
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 0;
return 1;
}
return 0;
}
static int dtio_find_msg(struct dt_io_thread* dtio)
{
struct dt_io_list_item *spot, *item;
spot = dtio->io_list_iter;
if(spot)
dtio->io_list_iter = spot->next;
else if(dtio->io_list)
dtio->io_list_iter = dtio->io_list->next;
item = spot;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
item = dtio->io_list;
while(item) {
if(dtio_find_in_queue(dtio, item->queue))
return 1;
item = item->next;
}
return 0;
}
void dtio_reconnect_timeout_cb(int ATTR_UNUSED(fd),
short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
dtio->reconnect_is_added = 0;
verbose(VERB_ALGO, "dnstap io: reconnect timer");
dtio_open_output(dtio);
if(dtio->event) {
if(!dtio_add_output_event_write(dtio))
return;
return;
}
dtio_reconnect_enable(dtio);
}
static void dtio_reconnect_enable(struct dt_io_thread* dtio)
{
struct timeval tv;
int msec;
if(dtio->want_to_exit) return;
if(dtio->reconnect_is_added)
return;
msec = dtio->reconnect_timeout;
if(msec == 0) {
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MIN;
} else {
dtio->reconnect_timeout = msec*2;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MAX)
dtio->reconnect_timeout = DTIO_RECONNECT_TIMEOUT_MAX;
}
verbose(VERB_ALGO, "dnstap io: set reconnect attempt after %d msec",
msec);
memset(&tv, 0, sizeof(tv));
tv.tv_sec = msec/1000;
tv.tv_usec = (msec%1000)*1000;
if(ub_timer_add(dtio->reconnect_timer, dtio->event_base,
&dtio_reconnect_timeout_cb, dtio, &tv) != 0) {
log_err("dnstap io: could not reconnect ev timer add");
return;
}
dtio->reconnect_is_added = 1;
}
static void dtio_reconnect_del(struct dt_io_thread* dtio)
{
if(!dtio->reconnect_is_added)
return;
ub_timer_del(dtio->reconnect_timer);
dtio->reconnect_is_added = 0;
}
static void dtio_reconnect_clear(struct dt_io_thread* dtio)
{
dtio->reconnect_timeout = 0;
dtio_reconnect_del(dtio);
}
static void dtio_reconnect_slow(struct dt_io_thread* dtio, int msec)
{
dtio_reconnect_del(dtio);
dtio->reconnect_timeout = msec;
dtio_reconnect_enable(dtio);
}
static void dtio_cur_msg_free(struct dt_io_thread* dtio)
{
free(dtio->cur_msg);
dtio->cur_msg = NULL;
dtio->cur_msg_len = 0;
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 0;
}
static void dtio_read_frame_free(struct dt_frame_read_buf* rb)
{
if(rb->buf) {
free(rb->buf);
rb->buf = NULL;
}
rb->buf_count = 0;
rb->buf_cap = 0;
rb->frame_len = 0;
rb->frame_len_done = 0;
rb->control_frame = 0;
}
static void dtio_del_output_event(struct dt_io_thread* dtio)
{
if(!dtio->event_added)
return;
ub_event_del(dtio->event);
dtio->event_added = 0;
dtio->event_added_is_write = 0;
}
static void dtio_close_fd(struct dt_io_thread* dtio)
{
sock_close(dtio->fd);
dtio->fd = -1;
}
static void dtio_close_output(struct dt_io_thread* dtio)
{
if(!dtio->event)
return;
ub_event_free(dtio->event);
dtio->event = NULL;
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(dtio->ssl);
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
if(dtio->cur_msg) {
dtio_cur_msg_free(dtio);
}
dtio->ready_frame_sent = 0;
dtio->accept_frame_received = 0;
dtio_read_frame_free(&dtio->read_frame);
dtio_reconnect_enable(dtio);
}
static int dtio_check_nb_connect(struct dt_io_thread* dtio)
{
int error = 0;
socklen_t len = (socklen_t)sizeof(error);
if(!dtio->check_nb_connect)
return 1;
if(getsockopt(dtio->fd, SOL_SOCKET, SO_ERROR, (void*)&error,
&len) < 0) {
#ifndef USE_WINSOCK
error = errno;
#else
error = WSAGetLastError();
#endif
}
#ifndef USE_WINSOCK
#if defined(EINPROGRESS) && defined(EWOULDBLOCK)
if(error == EINPROGRESS || error == EWOULDBLOCK)
return 0;
#endif
#else
if(error == WSAEINPROGRESS) {
return 0;
} else if(error == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event), UB_EV_WRITE);
return 0;
}
#endif
if(error != 0) {
char* to = dtio->socket_path;
if(!to) to = dtio->ip_str;
if(!to) to = "";
log_err("dnstap io: failed to connect to \"%s\": %s",
to, sock_strerror(error));
return -1;
}
if(dtio->ip_str)
verbose(VERB_DETAIL, "dnstap io: connected to %s",
dtio->ip_str);
else if(dtio->socket_path)
verbose(VERB_DETAIL, "dnstap io: connected to \"%s\"",
dtio->socket_path);
dtio_reconnect_clear(dtio);
dtio->check_nb_connect = 0;
return 1;
}
#ifdef HAVE_SSL
static int dtio_write_ssl(struct dt_io_thread* dtio, uint8_t* buf,
size_t len)
{
int r;
ERR_clear_error();
r = SSL_write(dtio->ssl, buf, len);
if(r <= 0) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
return -1;
} else if(want == SSL_ERROR_WANT_READ) {
dtio_enable_brief_read(dtio);
return 0;
} else if(want == SSL_ERROR_WANT_WRITE) {
return 0;
} else if(want == SSL_ERROR_SYSCALL) {
#ifdef EPIPE
if(errno == EPIPE && verbosity < 2)
return -1;
#endif
#ifdef ECONNRESET
if(errno == ECONNRESET && verbosity < 2)
return -1;
#endif
if(errno != 0) {
log_err("dnstap io, SSL_write syscall: %s",
strerror(errno));
}
return -1;
}
log_crypto_err_io("dnstap io, could not SSL_write", want);
return -1;
}
return r;
}
#endif
static int dtio_write_buf(struct dt_io_thread* dtio, uint8_t* buf,
size_t len)
{
ssize_t ret;
if(dtio->fd == -1)
return -1;
#ifdef HAVE_SSL
if(dtio->ssl)
return dtio_write_ssl(dtio, buf, len);
#endif
ret = send(dtio->fd, (void*)buf, len, 0);
if(ret == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return 0;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return 0;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_WRITE);
return 0;
}
#endif
log_err("dnstap io: failed send: %s", sock_strerror(errno));
return -1;
}
return ret;
}
#ifdef HAVE_WRITEV
static int dtio_write_with_writev(struct dt_io_thread* dtio)
{
uint32_t sendlen = htonl(dtio->cur_msg_len);
struct iovec iov[2];
ssize_t r;
iov[0].iov_base = ((uint8_t*)&sendlen)+dtio->cur_msg_len_done;
iov[0].iov_len = sizeof(sendlen)-dtio->cur_msg_len_done;
iov[1].iov_base = dtio->cur_msg;
iov[1].iov_len = dtio->cur_msg_len;
log_assert(iov[0].iov_len > 0);
r = writev(dtio->fd, iov, 2);
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return 0;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return 0;
if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock((dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_WRITE);
return 0;
}
#endif
log_err("dnstap io: failed writev: %s", sock_strerror(errno));
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
}
dtio->cur_msg_len_done += r;
if(dtio->cur_msg_len_done < 4)
return 0;
if(dtio->cur_msg_len_done > 4) {
dtio->cur_msg_done = dtio->cur_msg_len_done-4;
dtio->cur_msg_len_done = 4;
}
if(dtio->cur_msg_done < dtio->cur_msg_len)
return 0;
return 1;
}
#endif
static int dtio_write_more_of_len(struct dt_io_thread* dtio)
{
uint32_t sendlen;
int r;
if(dtio->cur_msg_len_done >= 4)
return 1;
#ifdef HAVE_WRITEV
if(!dtio->ssl) {
return dtio_write_with_writev(dtio);
}
#endif
sendlen = htonl(dtio->cur_msg_len);
r = dtio_write_buf(dtio,
((uint8_t*)&sendlen)+dtio->cur_msg_len_done,
sizeof(sendlen)-dtio->cur_msg_len_done);
if(r == -1) {
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
} else if(r == 0) {
return 0;
}
dtio->cur_msg_len_done += r;
if(dtio->cur_msg_len_done < 4)
return 0;
return 1;
}
static int dtio_write_more_of_data(struct dt_io_thread* dtio)
{
int r;
if(dtio->cur_msg_done >= dtio->cur_msg_len)
return 1;
r = dtio_write_buf(dtio,
((uint8_t*)dtio->cur_msg)+dtio->cur_msg_done,
dtio->cur_msg_len - dtio->cur_msg_done);
if(r == -1) {
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
} else if(r == 0) {
return 0;
}
dtio->cur_msg_done += r;
if(dtio->cur_msg_done < dtio->cur_msg_len)
return 0;
return 1;
}
static int dtio_write_more(struct dt_io_thread* dtio)
{
if(dtio->cur_msg_len_done < 4) {
if(!dtio_write_more_of_len(dtio))
return 0;
}
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more_of_data(dtio))
return 0;
}
return 1;
}
static ssize_t receive_bytes(struct dt_io_thread* dtio, void* buf, size_t len) {
ssize_t r;
r = recv(dtio->fd, (void*)buf, len, MSG_DONTWAIT);
if(r == -1) {
char* to = dtio->socket_path;
if(!to) to = dtio->ip_str;
if(!to) to = "";
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return -1;
#else
if(WSAGetLastError() == WSAEINPROGRESS) {
return -1;
} else if(WSAGetLastError() == WSAEWOULDBLOCK) {
ub_winsock_tcp_wouldblock(
(dtio->stop_flush_event?
dtio->stop_flush_event:dtio->event),
UB_EV_READ);
return -1;
}
#endif
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0;
log_err("dnstap io: output closed, recv %s: %s", to,
strerror(errno));
return 0;
}
if(r == 0) {
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0;
verbose(VERB_DETAIL, "dnstap io: output closed by the other side");
return 0;
}
return r;
}
#ifdef HAVE_SSL
static int ssl_read_bytes(struct dt_io_thread* dtio, void* buf, size_t len)
{
int r;
ERR_clear_error();
r = SSL_read(dtio->ssl, buf, len);
if(r <= 0) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4)
return 0;
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
} else if(want == SSL_ERROR_WANT_READ) {
return -1;
} else if(want == SSL_ERROR_WANT_WRITE) {
(void)dtio_enable_brief_write(dtio);
return -1;
} else if(want == SSL_ERROR_SYSCALL) {
#ifdef ECONNRESET
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
errno == ECONNRESET && verbosity < 4)
return 0;
#endif
if(errno != 0)
log_err("SSL_read syscall: %s",
strerror(errno));
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
}
log_crypto_err_io("could not SSL_read", want);
verbose(VERB_DETAIL, "dnstap io: output closed by the "
"other side");
return 0;
}
return r;
}
#endif
static int dtio_check_close(struct dt_io_thread* dtio)
{
uint8_t buf[1024];
int r = -1;
if(dtio->fd == -1) return 0;
while(r != 0) {
r = receive_bytes(dtio, (void*)buf, sizeof(buf));
if(r == -1)
return 1;
}
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return 0;
}
static int dtio_read_accept_frame(struct dt_io_thread* dtio)
{
int r;
size_t read_frame_done;
while(dtio->read_frame.frame_len_done < 4) {
#ifdef HAVE_SSL
if(dtio->ssl) {
r = ssl_read_bytes(dtio,
(uint8_t*)&dtio->read_frame.frame_len+
dtio->read_frame.frame_len_done,
4-dtio->read_frame.frame_len_done);
} else {
#endif
r = receive_bytes(dtio,
(uint8_t*)&dtio->read_frame.frame_len+
dtio->read_frame.frame_len_done,
4-dtio->read_frame.frame_len_done);
#ifdef HAVE_SSL
}
#endif
if(r == -1)
return -1;
if(r == 0) {
goto close_connection;
}
dtio->read_frame.frame_len_done += r;
if(dtio->read_frame.frame_len_done < 4)
return -1;
if(dtio->read_frame.frame_len == 0) {
dtio->read_frame.frame_len_done = 0;
dtio->read_frame.control_frame = 1;
continue;
}
dtio->read_frame.frame_len = ntohl(dtio->read_frame.frame_len);
if(dtio->read_frame.frame_len > DTIO_RECV_FRAME_MAX_LEN) {
verbose(VERB_OPS, "dnstap: received frame exceeds max "
"length of %d bytes, closing connection",
DTIO_RECV_FRAME_MAX_LEN);
goto close_connection;
}
dtio->read_frame.buf = calloc(1, dtio->read_frame.frame_len);
dtio->read_frame.buf_cap = dtio->read_frame.frame_len;
if(!dtio->read_frame.buf) {
log_err("dnstap io: out of memory (creating read "
"buffer)");
goto close_connection;
}
}
if(dtio->read_frame.buf_count < dtio->read_frame.frame_len) {
#ifdef HAVE_SSL
if(dtio->ssl) {
r = ssl_read_bytes(dtio, dtio->read_frame.buf+
dtio->read_frame.buf_count,
dtio->read_frame.buf_cap-
dtio->read_frame.buf_count);
} else {
#endif
r = receive_bytes(dtio, dtio->read_frame.buf+
dtio->read_frame.buf_count,
dtio->read_frame.buf_cap-
dtio->read_frame.buf_count);
#ifdef HAVE_SSL
}
#endif
if(r == -1)
return -1;
if(r == 0) {
goto close_connection;
}
dtio->read_frame.buf_count += r;
if(dtio->read_frame.buf_count < dtio->read_frame.frame_len)
return -1;
}
if(dtio->read_frame.frame_len < 4) {
verbose(VERB_OPS, "dnstap: invalid data received");
goto close_connection;
}
if(sldns_read_uint32(dtio->read_frame.buf) !=
FSTRM_CONTROL_FRAME_ACCEPT) {
verbose(VERB_ALGO, "dnstap: invalid control type received, "
"ignored");
dtio->ready_frame_sent = 0;
dtio->accept_frame_received = 0;
dtio_read_frame_free(&dtio->read_frame);
return -1;
}
read_frame_done = 4;
while(read_frame_done+8 < dtio->read_frame.frame_len) {
uint32_t type = sldns_read_uint32(dtio->read_frame.buf +
read_frame_done);
uint32_t len = sldns_read_uint32(dtio->read_frame.buf +
read_frame_done + 4);
if(type == FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE) {
if(len == strlen(DNSTAP_CONTENT_TYPE) &&
read_frame_done+8+len <=
dtio->read_frame.frame_len &&
memcmp(dtio->read_frame.buf + read_frame_done +
+ 8, DNSTAP_CONTENT_TYPE, len) == 0) {
if(!dtio_control_start_send(dtio)) {
verbose(VERB_OPS, "dnstap io: out of "
"memory while sending START frame");
goto close_connection;
}
dtio->accept_frame_received = 1;
if(!dtio_add_output_event_write(dtio))
goto close_connection;
return 1;
} else {
verbose(VERB_ALGO, "dnstap: ACCEPT frame "
"contains unknown content type, "
"closing connection");
goto close_connection;
}
}
read_frame_done += 8+len;
}
close_connection:
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
static int dtio_add_output_event_read(struct dt_io_thread* dtio)
{
if(!dtio->event)
return 0;
if(dtio->event_added && !dtio->event_added_is_write)
return 1;
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_del_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
dtio->event_added = 0;
dtio->event_added_is_write = 0;
dtio_close_output(dtio);
return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 0;
return 1;
}
static int dtio_add_output_event_write(struct dt_io_thread* dtio)
{
if(!dtio->event)
return 0;
if(dtio->event_added && dtio->event_added_is_write)
return 1;
if(dtio->event_added)
ub_event_del(dtio->event);
ub_event_add_bits(dtio->event, UB_EV_WRITE);
if(ub_event_add(dtio->event, NULL) != 0) {
log_err("dnstap io: out of memory (adding event)");
dtio->event_added = 0;
dtio->event_added_is_write = 0;
dtio_close_output(dtio);
return 0;
}
dtio->event_added = 1;
dtio->event_added_is_write = 1;
return 1;
}
static void dtio_sleep(struct dt_io_thread* dtio)
{
(void)dtio_add_output_event_read(dtio);
lock_basic_lock(&dtio->wakeup_timer_lock);
dtio->wakeup_timer_enabled = 0;
lock_basic_unlock(&dtio->wakeup_timer_lock);
}
#ifdef HAVE_SSL
static int dtio_enable_brief_read(struct dt_io_thread* dtio)
{
dtio->ssl_brief_read = 1;
if(dtio->stop_flush_event) {
ub_event_del(dtio->stop_flush_event);
ub_event_del_bits(dtio->stop_flush_event, UB_EV_WRITE);
if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
log_err("dnstap io, stop flush, could not ub_event_add");
return 0;
}
return 1;
}
return dtio_add_output_event_read(dtio);
}
#endif
#ifdef HAVE_SSL
static int dtio_disable_brief_read(struct dt_io_thread* dtio)
{
dtio->ssl_brief_read = 0;
if(dtio->stop_flush_event) {
ub_event_del(dtio->stop_flush_event);
ub_event_add_bits(dtio->stop_flush_event, UB_EV_WRITE);
if(ub_event_add(dtio->stop_flush_event, NULL) != 0) {
log_err("dnstap io, stop flush, could not ub_event_add");
return 0;
}
return 1;
}
return dtio_add_output_event_write(dtio);
}
#endif
#ifdef HAVE_SSL
static int dtio_enable_brief_write(struct dt_io_thread* dtio)
{
dtio->ssl_brief_write = 1;
return dtio_add_output_event_write(dtio);
}
#endif
#ifdef HAVE_SSL
static int dtio_disable_brief_write(struct dt_io_thread* dtio)
{
dtio->ssl_brief_write = 0;
return dtio_add_output_event_read(dtio);
}
#endif
#ifdef HAVE_SSL
static int dtio_ssl_check_peer(struct dt_io_thread* dtio)
{
if((SSL_get_verify_mode(dtio->ssl)&SSL_VERIFY_PEER)) {
if(SSL_get_verify_result(dtio->ssl) == X509_V_OK) {
#ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
X509* x = SSL_get1_peer_certificate(dtio->ssl);
#else
X509* x = SSL_get_peer_certificate(dtio->ssl);
#endif
if(!x) {
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection failed no certificate",
dtio->ip_str);
return 0;
}
log_cert(VERB_ALGO, "dnstap io, peer certificate",
x);
#ifdef HAVE_SSL_GET0_PEERNAME
if(SSL_get0_peername(dtio->ssl)) {
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection to %s authenticated",
dtio->ip_str,
SSL_get0_peername(dtio->ssl));
} else {
#endif
verbose(VERB_ALGO, "dnstap io, %s, SSL "
"connection authenticated",
dtio->ip_str);
#ifdef HAVE_SSL_GET0_PEERNAME
}
#endif
X509_free(x);
} else {
#ifdef HAVE_SSL_GET1_PEER_CERTIFICATE
X509* x = SSL_get1_peer_certificate(dtio->ssl);
#else
X509* x = SSL_get_peer_certificate(dtio->ssl);
#endif
if(x) {
log_cert(VERB_ALGO, "dnstap io, peer "
"certificate", x);
X509_free(x);
}
verbose(VERB_ALGO, "dnstap io, %s, SSL connection "
"failed: failed to authenticate",
dtio->ip_str);
return 0;
}
} else {
verbose(VERB_ALGO, "dnstap io, %s, SSL connection",
dtio->ip_str);
}
return 1;
}
#endif
#ifdef HAVE_SSL
static int dtio_ssl_handshake(struct dt_io_thread* dtio,
struct stop_flush_info* info)
{
int r;
if(dtio->ssl_brief_read) {
if(!dtio_disable_brief_read(dtio)) {
if(info) dtio_stop_flush_exit(info);
return 0;
}
}
if(dtio->ssl_handshake_done)
return 1;
ERR_clear_error();
r = SSL_do_handshake(dtio->ssl);
if(r != 1) {
int want = SSL_get_error(dtio->ssl, r);
if(want == SSL_ERROR_WANT_READ) {
if(!dtio_enable_brief_read(dtio)) {
if(info) dtio_stop_flush_exit(info);
return 0;
}
return 0;
} else if(want == SSL_ERROR_WANT_WRITE) {
return 0;
} else if(r == 0) {
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
} else if(want == SSL_ERROR_SYSCALL) {
int silent = 0;
#ifdef EPIPE
if(errno == EPIPE && verbosity < 2)
silent = 1;
#endif
#ifdef ECONNRESET
if(errno == ECONNRESET && verbosity < 2)
silent = 1;
#endif
if(errno == 0)
silent = 1;
if(!silent)
log_err("dnstap io, SSL_handshake syscall: %s",
strerror(errno));
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
} else {
unsigned long err = ERR_get_error();
if(!squelch_err_ssl_handshake(err)) {
log_crypto_err_io_code("dnstap io, ssl handshake failed",
want, err);
verbose(VERB_OPS, "dnstap io, ssl handshake failed "
"from %s", dtio->ip_str);
}
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
}
dtio->ssl_handshake_done = 1;
if(!dtio_ssl_check_peer(dtio)) {
if(info) dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_reconnect_slow(dtio, DTIO_RECONNECT_TIMEOUT_SLOW);
dtio_close_output(dtio);
return 0;
}
return 1;
}
#endif
void dtio_output_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
int i;
if(dtio->check_nb_connect) {
int connect_err = dtio_check_nb_connect(dtio);
if(connect_err == -1) {
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return;
} else if(connect_err == 0) {
return;
}
}
#ifdef HAVE_SSL
if(dtio->ssl &&
(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
if(!dtio_ssl_handshake(dtio, NULL))
return;
}
#endif
if((bits&UB_EV_READ) || dtio->ssl_brief_write) {
#ifdef HAVE_SSL
if(dtio->ssl_brief_write)
(void)dtio_disable_brief_write(dtio);
#endif
if(dtio->ready_frame_sent && !dtio->accept_frame_received) {
if(dtio_read_accept_frame(dtio) <= 0)
return;
} else if(!dtio_check_close(dtio))
return;
}
for(i=0; i<DTIO_MESSAGES_PER_CALLBACK; i++) {
if(!dtio->cur_msg) {
if(!dtio_find_msg(dtio)) {
if(i == 0) {
dtio_sleep(dtio);
if(dtio_find_msg(dtio)) {
if(!dtio_add_output_event_write(dtio))
return;
}
}
if(!dtio->cur_msg)
return;
}
}
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio))
return;
}
dtio_cur_msg_free(dtio);
if(dtio->is_bidirectional && !dtio->ready_frame_sent) {
dtio->ready_frame_sent = 1;
(void)dtio_add_output_event_read(dtio);
break;
}
}
}
void dtio_cmd_cb(int fd, short ATTR_UNUSED(bits), void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
uint8_t cmd;
ssize_t r;
if(dtio->want_to_exit)
return;
r = read(fd, &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
return;
if(WSAGetLastError() == WSAEWOULDBLOCK)
return;
#endif
log_err("dnstap io: failed to read: %s", sock_strerror(errno));
} else if(r == 0) {
verbose(VERB_ALGO, "dnstap io: cmd channel closed");
} else if(r == 1 && cmd == DTIO_COMMAND_STOP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd quit");
} else if(r == 1 && cmd == DTIO_COMMAND_WAKEUP) {
verbose(VERB_ALGO, "dnstap io: cmd channel cmd wakeup");
if(dtio->is_bidirectional && !dtio->accept_frame_received) {
verbose(VERB_ALGO, "dnstap io: cmd wakeup ignored, "
"waiting for ACCEPT control frame");
return;
}
if(!dtio_add_output_event_write(dtio))
return;
return;
} else if(r == 1) {
verbose(VERB_ALGO, "dnstap io: cmd channel unknown command");
}
dtio->want_to_exit = 1;
if(ub_event_base_loopexit((struct ub_event_base*)dtio->event_base)
!= 0) {
log_err("dnstap io: could not loopexit");
}
}
#ifndef THREADS_DISABLED
static void dtio_setup_base(struct dt_io_thread* dtio, time_t* secs,
struct timeval* now)
{
memset(now, 0, sizeof(*now));
dtio->event_base = ub_default_event_base(0, secs, now);
if(!dtio->event_base) {
fatal_exit("dnstap io: could not create event_base");
}
}
#endif
static void dtio_setup_cmd(struct dt_io_thread* dtio)
{
struct ub_event* cmdev;
fd_set_nonblock(dtio->commandpipe[0]);
cmdev = ub_event_new(dtio->event_base, dtio->commandpipe[0],
UB_EV_READ | UB_EV_PERSIST, &dtio_cmd_cb, dtio);
if(!cmdev) {
fatal_exit("dnstap io: out of memory");
}
dtio->command_event = cmdev;
if(ub_event_add(cmdev, NULL) != 0) {
fatal_exit("dnstap io: out of memory (adding event)");
}
}
static void dtio_setup_reconnect(struct dt_io_thread* dtio)
{
dtio_reconnect_clear(dtio);
dtio->reconnect_timer = ub_event_new(dtio->event_base, -1,
UB_EV_TIMEOUT, &dtio_reconnect_timeout_cb, dtio);
if(!dtio->reconnect_timer) {
fatal_exit("dnstap io: out of memory");
}
}
struct stop_flush_info {
struct ub_event_base* base;
int want_to_exit_flush;
int timer_done;
struct dt_io_thread* dtio;
void* stop_frame;
size_t stop_frame_len;
size_t stop_frame_done;
};
static void dtio_stop_flush_exit(struct stop_flush_info* info)
{
if(info->want_to_exit_flush)
return;
info->want_to_exit_flush = 1;
if(ub_event_base_loopexit(info->base) != 0) {
log_err("dnstap io: could not loopexit");
}
}
static int dtio_control_stop_send(struct stop_flush_info* info)
{
struct dt_io_thread* dtio = info->dtio;
int r;
if(info->stop_frame_done >= info->stop_frame_len)
return 1;
r = dtio_write_buf(dtio, ((uint8_t*)info->stop_frame) +
info->stop_frame_done, info->stop_frame_len -
info->stop_frame_done);
if(r == -1) {
verbose(VERB_ALGO, "dnstap io: stop flush: output closed");
dtio_stop_flush_exit(info);
return 0;
}
if(r == 0) {
return 0;
}
info->stop_frame_done += r;
if(info->stop_frame_done < info->stop_frame_len)
return 0;
return 1;
}
void dtio_stop_timer_cb(int ATTR_UNUSED(fd), short ATTR_UNUSED(bits),
void* arg)
{
struct stop_flush_info* info = (struct stop_flush_info*)arg;
if(info->want_to_exit_flush)
return;
verbose(VERB_ALGO, "dnstap io: stop flush timer expired, stop flush");
info->timer_done = 1;
dtio_stop_flush_exit(info);
}
void dtio_stop_ev_cb(int ATTR_UNUSED(fd), short bits, void* arg)
{
struct stop_flush_info* info = (struct stop_flush_info*)arg;
struct dt_io_thread* dtio = info->dtio;
if(info->want_to_exit_flush)
return;
if(dtio->check_nb_connect) {
int connect_err = dtio_check_nb_connect(dtio);
if(connect_err == -1) {
dtio_stop_flush_exit(info);
dtio_del_output_event(dtio);
dtio_close_output(dtio);
return;
} else if(connect_err == 0) {
return;
}
}
#ifdef HAVE_SSL
if(dtio->ssl &&
(!dtio->ssl_handshake_done || dtio->ssl_brief_read)) {
if(!dtio_ssl_handshake(dtio, info))
return;
}
#endif
if((bits&UB_EV_READ)) {
if(!dtio_check_close(dtio)) {
if(dtio->fd == -1) {
verbose(VERB_ALGO, "dnstap io: "
"stop flush: output closed");
dtio_stop_flush_exit(info);
}
return;
}
}
if(dtio->cur_msg) {
if(dtio->cur_msg_done < dtio->cur_msg_len) {
if(!dtio_write_more(dtio)) {
if(dtio->fd == -1) {
verbose(VERB_ALGO, "dnstap io: "
"stop flush: output closed");
dtio_stop_flush_exit(info);
}
return;
}
}
verbose(VERB_ALGO, "dnstap io: stop flush completed "
"last frame");
dtio_cur_msg_free(dtio);
}
if(info->stop_frame_done < info->stop_frame_len) {
if(!dtio_control_stop_send(info))
return;
verbose(VERB_ALGO, "dnstap io: stop flush completed "
"stop control frame");
}
dtio_stop_flush_exit(info);
}
static void dtio_control_stop_flush(struct dt_io_thread* dtio)
{
time_t secs = 0;
struct timeval now;
struct stop_flush_info info;
struct timeval tv;
struct ub_event* timer, *stopev;
if(dtio->fd == -1 || dtio->check_nb_connect) {
return;
}
if(dtio->ssl && !dtio->ssl_handshake_done) {
return;
}
memset(&info, 0, sizeof(info));
memset(&now, 0, sizeof(now));
info.dtio = dtio;
info.base = ub_default_event_base(0, &secs, &now);
if(!info.base) {
log_err("dnstap io: malloc failure");
return;
}
timer = ub_event_new(info.base, -1, UB_EV_TIMEOUT,
&dtio_stop_timer_cb, &info);
if(!timer) {
log_err("dnstap io: malloc failure");
ub_event_base_free(info.base);
return;
}
memset(&tv, 0, sizeof(tv));
tv.tv_sec = 2;
if(ub_timer_add(timer, info.base, &dtio_stop_timer_cb, &info,
&tv) != 0) {
log_err("dnstap io: cannot event_timer_add");
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
stopev = ub_event_new(info.base, dtio->fd, UB_EV_READ |
UB_EV_WRITE | UB_EV_PERSIST, &dtio_stop_ev_cb, &info);
if(!stopev) {
log_err("dnstap io: malloc failure");
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
if(ub_event_add(stopev, NULL) != 0) {
log_err("dnstap io: cannot event_add");
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
info.stop_frame = fstrm_create_control_frame_stop(
&info.stop_frame_len);
if(!info.stop_frame) {
log_err("dnstap io: malloc failure");
ub_event_del(stopev);
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
return;
}
dtio->stop_flush_event = stopev;
verbose(VERB_ALGO, "dnstap io: stop flush started");
if(ub_event_base_dispatch(info.base) < 0) {
log_err("dnstap io: dispatch flush failed, errno is %s",
strerror(errno));
}
verbose(VERB_ALGO, "dnstap io: stop flush ended");
free(info.stop_frame);
dtio->stop_flush_event = NULL;
ub_event_del(stopev);
ub_event_free(stopev);
ub_timer_del(timer);
ub_event_free(timer);
ub_event_base_free(info.base);
}
static void dtio_desetup(struct dt_io_thread* dtio)
{
dtio_control_stop_flush(dtio);
dtio_del_output_event(dtio);
dtio_close_output(dtio);
ub_event_del(dtio->command_event);
ub_event_free(dtio->command_event);
#ifndef USE_WINSOCK
close(dtio->commandpipe[0]);
#else
_close(dtio->commandpipe[0]);
#endif
dtio->commandpipe[0] = -1;
dtio_reconnect_del(dtio);
ub_event_free(dtio->reconnect_timer);
dtio_cur_msg_free(dtio);
#ifndef THREADS_DISABLED
ub_event_base_free(dtio->event_base);
#endif
}
static int dtio_control_start_send(struct dt_io_thread* dtio)
{
log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
&dtio->cur_msg_len);
if(!dtio->cur_msg) {
return 0;
}
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 4;
return 1;
}
static int dtio_control_ready_send(struct dt_io_thread* dtio)
{
log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
dtio->cur_msg = fstrm_create_control_frame_ready(DNSTAP_CONTENT_TYPE,
&dtio->cur_msg_len);
if(!dtio->cur_msg) {
return 0;
}
dtio->cur_msg_done = 0;
dtio->cur_msg_len_done = 4;
return 1;
}
static int dtio_open_output_local(struct dt_io_thread* dtio)
{
#ifdef HAVE_SYS_UN_H
struct sockaddr_un s;
dtio->fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if(dtio->fd == -1) {
log_err("dnstap io: failed to create socket: %s",
sock_strerror(errno));
return 0;
}
memset(&s, 0, sizeof(s));
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
s.sun_len = (unsigned)sizeof(s);
#endif
s.sun_family = AF_LOCAL;
(void)strlcpy(s.sun_path, dtio->socket_path, sizeof(s.sun_path));
fd_set_nonblock(dtio->fd);
if(connect(dtio->fd, (struct sockaddr*)&s, (socklen_t)sizeof(s))
== -1) {
char* to = dtio->socket_path;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4) {
dtio_close_fd(dtio);
return 0;
}
log_err("dnstap io: failed to connect to \"%s\": %s",
to, sock_strerror(errno));
dtio_close_fd(dtio);
return 0;
}
return 1;
#else
log_err("cannot create af_local socket");
return 0;
#endif
}
static int dtio_open_output_tcp(struct dt_io_thread* dtio)
{
struct sockaddr_storage addr;
socklen_t addrlen;
memset(&addr, 0, sizeof(addr));
addrlen = (socklen_t)sizeof(addr);
if(!extstrtoaddr(dtio->ip_str, &addr, &addrlen, UNBOUND_DNS_PORT)) {
log_err("could not parse IP '%s'", dtio->ip_str);
return 0;
}
dtio->fd = socket(addr.ss_family, SOCK_STREAM, 0);
if(dtio->fd == -1) {
log_err("can't create socket: %s", sock_strerror(errno));
return 0;
}
fd_set_nonblock(dtio->fd);
if(connect(dtio->fd, (struct sockaddr*)&addr, addrlen) == -1) {
if(errno == EINPROGRESS)
return 1;
if(dtio->reconnect_timeout > DTIO_RECONNECT_TIMEOUT_MIN &&
verbosity < 4) {
dtio_close_fd(dtio);
return 0;
}
#ifndef USE_WINSOCK
if(tcp_connect_errno_needs_log(
(struct sockaddr *)&addr, addrlen)) {
log_err("dnstap io: failed to connect to %s: %s",
dtio->ip_str, strerror(errno));
}
#else
if(WSAGetLastError() == WSAEINPROGRESS ||
WSAGetLastError() == WSAEWOULDBLOCK)
return 1;
if(tcp_connect_errno_needs_log(
(struct sockaddr *)&addr, addrlen)) {
log_err("dnstap io: failed to connect to %s: %s",
dtio->ip_str, wsa_strerror(WSAGetLastError()));
}
#endif
dtio_close_fd(dtio);
return 0;
}
return 1;
}
static int dtio_setup_ssl(struct dt_io_thread* dtio)
{
dtio->ssl = outgoing_ssl_fd(dtio->ssl_ctx, dtio->fd);
if(!dtio->ssl) return 0;
dtio->ssl_handshake_done = 0;
dtio->ssl_brief_read = 0;
if(!set_auth_name_on_ssl(dtio->ssl, dtio->tls_server_name,
dtio->tls_use_sni)) {
return 0;
}
return 1;
}
static void dtio_open_output(struct dt_io_thread* dtio)
{
struct ub_event* ev;
if(dtio->upstream_is_unix) {
if(!dtio_open_output_local(dtio)) {
dtio_reconnect_enable(dtio);
return;
}
} else if(dtio->upstream_is_tcp || dtio->upstream_is_tls) {
if(!dtio_open_output_tcp(dtio)) {
dtio_reconnect_enable(dtio);
return;
}
if(dtio->upstream_is_tls) {
if(!dtio_setup_ssl(dtio)) {
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
}
}
dtio->check_nb_connect = 1;
ev = ub_event_new(dtio->event_base, dtio->fd,
UB_EV_READ | UB_EV_WRITE | UB_EV_PERSIST, &dtio_output_cb,
dtio);
if(!ev) {
log_err("dnstap io: out of memory");
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
dtio->event = ev;
if((!dtio->is_bidirectional && !dtio_control_start_send(dtio)) ||
(dtio->is_bidirectional && !dtio_control_ready_send(dtio)) ) {
log_err("dnstap io: out of memory");
ub_event_free(dtio->event);
dtio->event = NULL;
if(dtio->ssl) {
#ifdef HAVE_SSL
SSL_free(dtio->ssl);
dtio->ssl = NULL;
#endif
}
dtio_close_fd(dtio);
dtio_reconnect_enable(dtio);
return;
}
}
static void dtio_setup_on_base(struct dt_io_thread* dtio)
{
dtio_setup_cmd(dtio);
dtio_setup_reconnect(dtio);
dtio_open_output(dtio);
if(!dtio_add_output_event_write(dtio))
return;
}
#ifndef THREADS_DISABLED
static void* dnstap_io(void* arg)
{
struct dt_io_thread* dtio = (struct dt_io_thread*)arg;
time_t secs = 0;
struct timeval now;
log_thread_set(&dtio->threadnum);
verbose(VERB_ALGO, "start dnstap io thread");
dtio_setup_base(dtio, &secs, &now);
dtio_setup_on_base(dtio);
if(ub_event_base_dispatch(dtio->event_base) < 0) {
log_err("dnstap io: dispatch failed, errno is %s",
strerror(errno));
}
verbose(VERB_ALGO, "stop dnstap io thread");
dtio_desetup(dtio);
return NULL;
}
#endif
int dt_io_thread_start(struct dt_io_thread* dtio, void* event_base_nothr,
int numworkers)
{
#ifndef USE_WINSOCK
if(pipe(dtio->commandpipe) == -1) {
log_err("failed to create pipe: %s", strerror(errno));
return 0;
}
#else
if(_pipe(dtio->commandpipe, 4096, _O_BINARY) == -1) {
log_err("failed to create _pipe: %s",
wsa_strerror(WSAGetLastError()));
return 0;
}
#endif
dtio->threadnum = numworkers+1;
dtio->started = 1;
#ifndef THREADS_DISABLED
ub_thread_create(&dtio->tid, dnstap_io, dtio);
(void)event_base_nothr;
#else
dtio->event_base = event_base_nothr;
dtio_setup_on_base(dtio);
#endif
return 1;
}
void dt_io_thread_stop(struct dt_io_thread* dtio)
{
#ifndef THREADS_DISABLED
uint8_t cmd = DTIO_COMMAND_STOP;
#endif
if(!dtio) return;
if(!dtio->started) return;
verbose(VERB_ALGO, "dnstap io: send stop cmd");
#ifndef THREADS_DISABLED
while(1) {
ssize_t r = write(dtio->commandpipe[1], &cmd, sizeof(cmd));
if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
continue;
#else
if(WSAGetLastError() == WSAEINPROGRESS)
continue;
if(WSAGetLastError() == WSAEWOULDBLOCK)
continue;
#endif
log_err("dnstap io stop: write: %s",
sock_strerror(errno));
break;
}
break;
}
dtio->started = 0;
#endif
#ifndef USE_WINSOCK
close(dtio->commandpipe[1]);
#else
_close(dtio->commandpipe[1]);
#endif
dtio->commandpipe[1] = -1;
#ifndef THREADS_DISABLED
ub_thread_join(dtio->tid);
#else
dtio->want_to_exit = 1;
dtio_desetup(dtio);
#endif
}