#include <algorithm>
#include <atomic>
#include <deque>
#include <list>
#include <map>
#include <optional>
#include <vector>
#include <AutoLocker.h>
#include <DataIO.h>
#include <ErrorsExt.h>
#include <HttpFields.h>
#include <HttpRequest.h>
#include <HttpResult.h>
#include <HttpSession.h>
#include <Locker.h>
#include <Messenger.h>
#include <NetBuffer.h>
#include <NetServicesDefs.h>
#include <NetworkAddress.h>
#include <OS.h>
#include <SecureSocket.h>
#include <Socket.h>
#include <ZlibCompressionAlgorithm.h>
#include "HttpBuffer.h"
#include "HttpParser.h"
#include "HttpResultPrivate.h"
#include "HttpSerializer.h"
#include "NetServicesPrivate.h"
using namespace std::literals;
using namespace BPrivate::Network;
static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
struct CounterDeleter {
void operator()(int32* counter) const noexcept { atomic_add(counter, -1); }
};
class BHttpSession::Request
{
public:
Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
Request(Request& original, const Redirect& redirect);
enum RequestState { InitialState, Connected, RequestSent, ContentReceived };
RequestState State() const noexcept { return fRequestStatus; }
std::shared_ptr<HttpResultPrivate> Result() { return fResult; }
void SetError(std::exception_ptr e);
std::pair<BString, int> GetHost() const;
void SetCounter(int32* counter) noexcept;
void ResolveHostName();
void OpenConnection();
void TransferRequest();
bool ReceiveResult();
void Disconnect() noexcept;
int Socket() const noexcept { return fSocket->Socket(); }
int32 Id() const noexcept { return fResult->id; }
bool CanCancel() const noexcept { return fResult->CanCancel(); }
void SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc = nullptr) const;
private:
BHttpRequest fRequest;
RequestState fRequestStatus = InitialState;
BMessenger fObserver;
std::shared_ptr<HttpResultPrivate> fResult;
BNetworkAddress fRemoteAddress;
std::unique_ptr<BSocket> fSocket;
HttpBuffer fBuffer;
HttpSerializer fSerializer;
HttpParser fParser;
BHttpStatus fStatus;
BHttpFields fFields;
bool fMightRedirect = false;
int8 fRemainingRedirects;
std::unique_ptr<int32, CounterDeleter> fConnectionCounter;
};
class BHttpSession::Impl
{
public:
Impl();
~Impl() noexcept;
BHttpResult Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
void Cancel(int32 identifier);
void SetMaxConnectionsPerHost(size_t maxConnections);
void SetMaxHosts(size_t maxConnections);
private:
static status_t ControlThreadFunc(void* arg);
static status_t DataThreadFunc(void* arg);
std::vector<BHttpSession::Request> GetRequestsForControlThread();
private:
const sem_id fControlQueueSem;
const sem_id fDataQueueSem;
const thread_id fControlThread;
const thread_id fDataThread;
BLocker fLock;
std::atomic<bool> fQuitting = false;
std::list<BHttpSession::Request> fControlQueue;
std::deque<BHttpSession::Request> fDataQueue;
std::vector<int32> fCancelList;
using Host = std::pair<BString, int>;
std::map<Host, int32> fConnectionCount;
std::atomic<size_t> fMaxConnectionsPerHost = 2;
std::atomic<size_t> fMaxHosts = 10;
std::map<int, BHttpSession::Request> connectionMap;
std::vector<object_wait_info> objectList;
};
struct BHttpSession::Redirect {
BUrl url;
bool redirectToGet;
};
BHttpSession::Impl::Impl()
:
fControlQueueSem(create_sem(0, "http:control")),
fDataQueueSem(create_sem(0, "http:data")),
fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)),
fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this))
{
if (fControlQueueSem < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore");
if (fDataQueueSem < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore");
if (fControlThread < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread");
if (resume_thread(fControlThread) != B_OK)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread");
if (fDataThread < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread");
if (resume_thread(fDataThread) != B_OK)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread");
}
BHttpSession::Impl::~Impl() noexcept
{
fQuitting.store(true);
delete_sem(fControlQueueSem);
delete_sem(fDataQueueSem);
status_t threadResult;
wait_for_thread(fControlThread, &threadResult);
}
BHttpResult
BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
{
auto wRequest = Request(std::move(request), std::move(target), observer);
auto retval = BHttpResult(wRequest.Result());
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.push_back(std::move(wRequest));
release_sem(fControlQueueSem);
return retval;
}
void
BHttpSession::Impl::Cancel(int32 identifier)
{
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.remove_if([&identifier](auto& request) {
if (request.Id() == identifier) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
return true;
}
return false;
});
fCancelList.push_back(identifier);
release_sem(fDataQueueSem);
}
void
BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections)
{
if (maxConnections <= 0 || maxConnections >= INT32_MAX) {
throw BRuntimeError(
__PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX");
}
fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed);
}
void
BHttpSession::Impl::SetMaxHosts(size_t maxConnections)
{
if (maxConnections <= 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more");
fMaxHosts.store(maxConnections, std::memory_order_relaxed);
}
status_t
BHttpSession::Impl::ControlThreadFunc(void* arg)
{
BHttpSession::Impl* impl = static_cast<BHttpSession::Impl*>(arg);
while (true) {
if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED)
continue;
else if (status != B_OK) {
break;
}
if (impl->fQuitting.load())
break;
auto requests = impl->GetRequestsForControlThread();
if (requests.size() == 0)
continue;
for (auto& request: requests) {
bool hasError = false;
try {
request.ResolveHostName();
request.OpenConnection();
} catch (...) {
request.SetError(std::current_exception());
hasError = true;
}
if (hasError) {
release_sem(impl->fControlQueueSem);
continue;
}
impl->fLock.Lock();
impl->fDataQueue.push_back(std::move(request));
impl->fLock.Unlock();
release_sem(impl->fDataQueueSem);
}
}
if (impl->fQuitting.load()) {
status_t threadResult;
wait_for_thread(impl->fDataThread, &threadResult);
for (auto& request: impl->fControlQueue) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
}
} else {
throw BRuntimeError(
__PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted");
}
return B_OK;
}
static constexpr uint16 EVENT_CANCELLED = 0x4000;
status_t
BHttpSession::Impl::DataThreadFunc(void* arg)
{
BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
data->objectList.push_back(
object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
while (true) {
if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
status == B_INTERRUPTED)
continue;
else if (status < 0) {
throw BSystemError("wait_for_objects()", status);
}
if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
continue;
else if (status != B_OK) {
break;
}
data->fLock.Lock();
while (!data->fDataQueue.empty()) {
auto request = std::move(data->fDataQueue.front());
data->fDataQueue.pop_front();
auto socket = request.Socket();
data->connectionMap.insert(std::make_pair(socket, std::move(request)));
data->objectList.push_back(
object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE});
}
for (auto id: data->fCancelList) {
size_t offset = 0;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend();
it++) {
offset++;
if (it->second.Id() == id) {
data->objectList[offset].events = EVENT_CANCELLED;
break;
}
}
}
data->fCancelList.clear();
data->fLock.Unlock();
} else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
break;
}
bool resizeObjectList = false;
for (auto& item: data->objectList) {
if (item.type != B_OBJECT_TYPE_FD)
continue;
if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
auto& request = data->connectionMap.find(item.object)->second;
auto error = false;
try {
request.TransferRequest();
} catch (...) {
request.SetError(std::current_exception());
error = true;
}
if (error) {
request.Disconnect();
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
auto& request = data->connectionMap.find(item.object)->second;
auto finished = false;
try {
if (request.CanCancel())
finished = true;
else
finished = request.ReceiveResult();
} catch (const Redirect& r) {
auto lock = AutoLocker<BLocker>(data->fLock);
data->fControlQueue.emplace_back(request, r);
release_sem(data->fControlQueueSem);
finished = true;
} catch (...) {
request.SetError(std::current_exception());
finished = true;
}
if (finished) {
request.Disconnect();
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
auto& request = data->connectionMap.find(item.object)->second;
try {
throw BNetworkRequestError(
__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
} catch (...) {
request.SetError(std::current_exception());
}
data->connectionMap.erase(item.object);
resizeObjectList = true;
} else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
auto& request = data->connectionMap.find(item.object)->second;
request.Disconnect();
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
} else if (item.events == 0) {
continue;
} else {
auto& request = data->connectionMap.find(item.object)->second;
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?");
});
throw BRuntimeError(
__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
}
}
data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
if (resizeObjectList)
data->objectList.resize(data->connectionMap.size() + 1);
auto i = 1;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
data->objectList[i].object = it->first;
if (it->second.State() == Request::InitialState)
throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
else if (it->second.State() == Request::Connected)
data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
else
data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
i++;
}
}
if (data->fQuitting.load()) {
for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
it->second.SetError(std::current_exception());
}
}
} else {
throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted");
}
return B_OK;
}
std::vector<BHttpSession::Request>
BHttpSession::Impl::GetRequestsForControlThread()
{
std::vector<BHttpSession::Request> requests;
if (fConnectionCount.size() >= fMaxHosts.load()) {
for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) {
if (atomic_get(std::addressof(it->second)) == 0) {
it = fConnectionCount.erase(it);
} else {
it++;
}
}
}
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.remove_if([this, &requests](auto& request) {
auto host = request.GetHost();
auto it = fConnectionCount.find(host);
if (it != fConnectionCount.end()) {
if (static_cast<size_t>(atomic_get(std::addressof(it->second)))
>= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) {
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
msg.AddString(UrlEventData::DebugMessage,
"Request is queued: too many active connections for host");
});
return false;
} else {
atomic_add(std::addressof(it->second), 1);
request.SetCounter(std::addressof(it->second));
}
} else {
if (fConnectionCount.size() == fMaxHosts.load()) {
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
msg.AddString(UrlEventData::DebugMessage,
"Request is queued: maximum number of concurrent hosts");
});
return false;
}
auto [newIt, success] = fConnectionCount.insert({host, 1});
if (!success) {
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount");
}
request.SetCounter(std::addressof(newIt->second));
}
requests.emplace_back(std::move(request));
return true;
});
return requests;
}
BHttpSession::BHttpSession()
{
fImpl = std::make_shared<BHttpSession::Impl>();
}
BHttpSession::~BHttpSession() = default;
BHttpSession::BHttpSession(const BHttpSession&) noexcept = default;
BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default;
BHttpResult
BHttpSession::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
{
return fImpl->Execute(std::move(request), std::move(target), observer);
}
void
BHttpSession::Cancel(int32 identifier)
{
fImpl->Cancel(identifier);
}
void
BHttpSession::Cancel(const BHttpResult& request)
{
fImpl->Cancel(request.Identity());
}
void
BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections)
{
fImpl->SetMaxConnectionsPerHost(maxConnections);
}
void
BHttpSession::SetMaxHosts(size_t maxConnections)
{
fImpl->SetMaxHosts(maxConnections);
}
BHttpSession::Request::Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
:
fRequest(std::move(request)),
fObserver(observer)
{
auto identifier = get_netservices_request_identifier();
fRemainingRedirects = fRequest.MaxRedirections();
fResult = std::make_shared<HttpResultPrivate>(identifier);
if (target.HasValue())
fResult->bodyTarget = std::move(target);
if (fRequest.Method() == BHttpMethod::Head)
fParser.SetNoContent();
}
BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect)
:
fRequest(std::move(original.fRequest)),
fObserver(original.fObserver),
fResult(original.fResult)
{
fRequest.SetUrl(redirect.url);
if (redirect.redirectToGet
&& (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) {
fRequest.SetMethod(BHttpMethod::Get);
fRequest.ClearRequestBody();
}
fRemainingRedirects = original.fRemainingRedirects--;
if (fRequest.Method() == BHttpMethod::Head)
fParser.SetNoContent();
}
void
BHttpSession::Request::SetError(std::exception_ptr e)
{
fResult->SetError(e);
SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
try {
std::rethrow_exception(e);
} catch (BError& error) {
msg.AddString(UrlEventData::DebugMessage, error.DebugMessage());
} catch (std::exception& error) {
msg.AddString(UrlEventData::DebugMessage, error.what());
} catch (...) {
msg.AddString(UrlEventData::DebugMessage, "Unknown exception");
}
});
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); });
}
std::pair<BString, int>
BHttpSession::Request::GetHost() const
{
return {fRequest.Url().Host(), fRequest.Url().Port()};
}
void
BHttpSession::Request::SetCounter(int32* counter) noexcept
{
fConnectionCounter = std::unique_ptr<int32, CounterDeleter>(counter);
}
void
BHttpSession::Request::ResolveHostName()
{
int port;
if (fRequest.Url().HasPort())
port = fRequest.Url().Port();
else if (fRequest.Url().Protocol() == "https")
port = 443;
else
port = 80;
if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) {
throw BNetworkRequestError(
"BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status);
}
SendMessage(UrlEvent::HostNameResolved,
[this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); });
}
void
BHttpSession::Request::OpenConnection()
{
if (fRequest.Url().Protocol() == "https") {
fSocket = std::make_unique<BSecureSocket>();
} else {
fSocket = std::make_unique<BSocket>();
}
fSocket->SetTimeout(fRequest.Timeout());
if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) {
throw BNetworkRequestError(
"BSocket::Connect()", BNetworkRequestError::NetworkError, status);
}
auto flags = fcntl(fSocket->Socket(), F_GETFL, 0);
if (flags == -1)
throw BRuntimeError("fcntl()", "Error getting socket flags");
if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0)
throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket");
SendMessage(UrlEvent::ConnectionOpened);
fRequestStatus = Connected;
}
void
BHttpSession::Request::TransferRequest()
{
if (fRequestStatus != Connected)
throw BRuntimeError(
__PRETTY_FUNCTION__, "Write request for object that is not in the Connected state");
if (!fSerializer.IsInitialized())
fSerializer.SetTo(fBuffer, fRequest);
auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get());
if (currentBytesWritten > 0) {
SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred());
if (auto totalSize = fSerializer.BodyBytesTotal())
msg.AddInt64(UrlEventData::TotalBytes, totalSize.value());
});
}
if (fSerializer.Complete())
fRequestStatus = RequestSent;
}
bool
BHttpSession::Request::ReceiveResult()
{
auto bytesRead = fBuffer.ReadFrom(fSocket.get());
if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED)
return false;
auto readEnd = bytesRead == 0;
switch (fParser.State()) {
case HttpInputStreamState::StatusLine:
{
if (fBuffer.RemainingBytes() == static_cast<size_t>(bytesRead)) {
SendMessage(UrlEvent::ResponseStarted);
}
if (fParser.ParseStatus(fBuffer, fStatus)) {
if (fRemainingRedirects > 0) {
switch (fStatus.StatusCode()) {
case BHttpStatusCode::MovedPermanently:
case BHttpStatusCode::TemporaryRedirect:
case BHttpStatusCode::PermanentRedirect:
if (!fRequest.RewindBody())
break;
[[fallthrough]];
case BHttpStatusCode::Found:
case BHttpStatusCode::SeeOther:
fMightRedirect = true;
break;
default:
break;
}
}
if ((fStatus.StatusClass() == BHttpStatusClass::ClientError
|| fStatus.StatusClass() == BHttpStatusClass::ServerError)
&& fRequest.StopOnError()) {
fRequestStatus = ContentReceived;
fResult->SetStatus(std::move(fStatus));
fResult->SetFields(BHttpFields());
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
return true;
}
if (!fMightRedirect) {
SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
});
fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
}
} else {
if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Response did not include a complete status line");
}
return false;
}
[[fallthrough]];
}
case HttpInputStreamState::Fields:
{
if (!fParser.ParseFields(fBuffer, fFields)) {
if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Response did not include a complete header section");
}
break;
}
if (fMightRedirect) {
auto redirectToGet = false;
switch (fStatus.StatusCode()) {
case BHttpStatusCode::Found:
case BHttpStatusCode::SeeOther:
redirectToGet = true;
[[fallthrough]];
case BHttpStatusCode::MovedPermanently:
case BHttpStatusCode::TemporaryRedirect:
case BHttpStatusCode::PermanentRedirect:
{
auto locationField = fFields.FindField("Location");
if (locationField == fFields.end()) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Redirect; the Location field must be present and cannot be found");
}
auto locationString = BString(
(*locationField).Value().data(), (*locationField).Value().size());
auto redirect = BHttpSession::Redirect{
BUrl(fRequest.Url(), locationString), redirectToGet};
if (!redirect.url.IsValid()) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Redirect; invalid URL in the Location field");
}
SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) {
msg.AddString(UrlEventData::HttpRedirectUrl, locationString);
});
throw redirect;
}
default:
SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
});
fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
break;
}
}
fResult->SetFields(std::move(fFields));
SendMessage(UrlEvent::HttpFields);
if (!fParser.HasContent()) {
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
fRequestStatus = ContentReceived;
return true;
}
[[fallthrough]];
}
case HttpInputStreamState::Body:
{
size_t bytesWrittenToBody;
bytesRead = fParser.ParseBody(
fBuffer,
[this, &bytesWrittenToBody](const std::byte* buffer, size_t size) {
bytesWrittenToBody = fResult->WriteToBody(buffer, size);
return bytesWrittenToBody;
},
readEnd);
SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, bytesRead);
if (fParser.BodyBytesTotal())
msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value());
});
if (bytesWrittenToBody > 0) {
SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody);
});
}
if (fParser.Complete()) {
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
fRequestStatus = ContentReceived;
return true;
} else if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError,
"Unexpected end of data: more data was expected");
}
break;
}
default:
throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable");
}
return false;
}
void
BHttpSession::Request::Disconnect() noexcept
{
fSocket->Disconnect();
}
void
BHttpSession::Request::SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc) const
{
if (fObserver.IsValid()) {
BMessage msg(what);
msg.AddInt32(UrlEventData::Id, fResult->id);
if (dataFunc)
dataFunc(msg);
fObserver.SendMessage(&msg);
}
}
namespace BPrivate::Network::UrlEventData {
const char* HttpStatusCode = "url:httpstatuscode";
const char* SSLCertificate = "url:sslcertificate";
const char* SSLMessage = "url:sslmessage";
const char* HttpRedirectUrl = "url:httpredirecturl";
}