* Copyright 2022 Haiku Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
* Niels Sascha Reedijk, niels.reedijk@gmail.com
*/
#include <algorithm>
#include <atomic>
#include <deque>
#include <list>
#include <map>
#include <optional>
#include <vector>
#include <AutoLocker.h>
#include <DataIO.h>
#include <ErrorsExt.h>
#include <HttpFields.h>
#include <HttpRequest.h>
#include <HttpResult.h>
#include <HttpSession.h>
#include <Locker.h>
#include <Messenger.h>
#include <NetBuffer.h>
#include <NetServicesDefs.h>
#include <NetworkAddress.h>
#include <OS.h>
#include <SecureSocket.h>
#include <Socket.h>
#include <ZlibCompressionAlgorithm.h>
#include "HttpBuffer.h"
#include "HttpParser.h"
#include "HttpResultPrivate.h"
#include "HttpSerializer.h"
#include "NetServicesPrivate.h"
using namespace std::literals;
using namespace BPrivate::Network;
\brief Maximum size of the HTTP Header lines of the message.
In the RFC there is no maximum, but we need to prevent the situation where we keep growing the
internal buffer waiting for the end of line ('\r\n\') characters to occur.
*/
static constexpr ssize_t kMaxHeaderLineSize = 64 * 1024;
struct CounterDeleter {
void operator()(int32* counter) const noexcept { atomic_add(counter, -1); }
};
class BHttpSession::Request
{
public:
Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
Request(Request& original, const Redirect& redirect);
enum RequestState { InitialState, Connected, RequestSent, ContentReceived };
RequestState State() const noexcept { return fRequestStatus; }
std::shared_ptr<HttpResultPrivate> Result() { return fResult; }
void SetError(std::exception_ptr e);
std::pair<BString, int> GetHost() const;
void SetCounter(int32* counter) noexcept;
void ResolveHostName();
void OpenConnection();
void TransferRequest();
bool ReceiveResult();
void Disconnect() noexcept;
int Socket() const noexcept { return fSocket->Socket(); }
int32 Id() const noexcept { return fResult->id; }
bool CanCancel() const noexcept { return fResult->CanCancel(); }
void SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc = nullptr) const;
private:
BHttpRequest fRequest;
RequestState fRequestStatus = InitialState;
BMessenger fObserver;
std::shared_ptr<HttpResultPrivate> fResult;
BNetworkAddress fRemoteAddress;
std::unique_ptr<BSocket> fSocket;
HttpBuffer fBuffer;
HttpSerializer fSerializer;
HttpParser fParser;
BHttpStatus fStatus;
BHttpFields fFields;
bool fMightRedirect = false;
int8 fRemainingRedirects;
std::unique_ptr<int32, CounterDeleter> fConnectionCounter;
};
class BHttpSession::Impl
{
public:
Impl();
~Impl() noexcept;
BHttpResult Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer);
void Cancel(int32 identifier);
void SetMaxConnectionsPerHost(size_t maxConnections);
void SetMaxHosts(size_t maxConnections);
private:
static status_t ControlThreadFunc(void* arg);
static status_t DataThreadFunc(void* arg);
std::vector<BHttpSession::Request> GetRequestsForControlThread();
private:
const sem_id fControlQueueSem;
const sem_id fDataQueueSem;
const thread_id fControlThread;
const thread_id fDataThread;
BLocker fLock;
std::atomic<bool> fQuitting = false;
std::list<BHttpSession::Request> fControlQueue;
std::deque<BHttpSession::Request> fDataQueue;
std::vector<int32> fCancelList;
using Host = std::pair<BString, int>;
std::map<Host, int32> fConnectionCount;
std::atomic<size_t> fMaxConnectionsPerHost = 2;
std::atomic<size_t> fMaxHosts = 10;
std::map<int, BHttpSession::Request> connectionMap;
std::vector<object_wait_info> objectList;
};
struct BHttpSession::Redirect {
BUrl url;
bool redirectToGet;
};
BHttpSession::Impl::Impl()
:
fControlQueueSem(create_sem(0, "http:control")),
fDataQueueSem(create_sem(0, "http:data")),
fControlThread(spawn_thread(ControlThreadFunc, "http:control", B_NORMAL_PRIORITY, this)),
fDataThread(spawn_thread(DataThreadFunc, "http:data", B_NORMAL_PRIORITY, this))
{
if (fControlQueueSem < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control queue semaphore");
if (fDataQueueSem < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data queue semaphore");
if (fControlThread < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create control thread");
if (resume_thread(fControlThread) != B_OK)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume control thread");
if (fDataThread < 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot create data thread");
if (resume_thread(fDataThread) != B_OK)
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot resume data thread");
}
BHttpSession::Impl::~Impl() noexcept
{
fQuitting.store(true);
delete_sem(fControlQueueSem);
delete_sem(fDataQueueSem);
status_t threadResult;
wait_for_thread(fControlThread, &threadResult);
}
BHttpResult
BHttpSession::Impl::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
{
auto wRequest = Request(std::move(request), std::move(target), observer);
auto retval = BHttpResult(wRequest.Result());
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.push_back(std::move(wRequest));
release_sem(fControlQueueSem);
return retval;
}
void
BHttpSession::Impl::Cancel(int32 identifier)
{
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.remove_if([&identifier](auto& request) {
if (request.Id() == identifier) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
return true;
}
return false;
});
fCancelList.push_back(identifier);
release_sem(fDataQueueSem);
}
void
BHttpSession::Impl::SetMaxConnectionsPerHost(size_t maxConnections)
{
if (maxConnections <= 0 || maxConnections >= INT32_MAX) {
throw BRuntimeError(
__PRETTY_FUNCTION__, "MaxConnectionsPerHost must be between 1 and INT32_MAX");
}
fMaxConnectionsPerHost.store(maxConnections, std::memory_order_relaxed);
}
void
BHttpSession::Impl::SetMaxHosts(size_t maxConnections)
{
if (maxConnections <= 0)
throw BRuntimeError(__PRETTY_FUNCTION__, "MaxHosts must be 1 or more");
fMaxHosts.store(maxConnections, std::memory_order_relaxed);
}
status_t
BHttpSession::Impl::ControlThreadFunc(void* arg)
{
BHttpSession::Impl* impl = static_cast<BHttpSession::Impl*>(arg);
while (true) {
if (auto status = acquire_sem(impl->fControlQueueSem); status == B_INTERRUPTED)
continue;
else if (status != B_OK) {
break;
}
if (impl->fQuitting.load())
break;
auto requests = impl->GetRequestsForControlThread();
if (requests.size() == 0)
continue;
for (auto& request: requests) {
bool hasError = false;
try {
request.ResolveHostName();
request.OpenConnection();
} catch (...) {
request.SetError(std::current_exception());
hasError = true;
}
if (hasError) {
release_sem(impl->fControlQueueSem);
continue;
}
impl->fLock.Lock();
impl->fDataQueue.push_back(std::move(request));
impl->fLock.Unlock();
release_sem(impl->fDataQueueSem);
}
}
if (impl->fQuitting.load()) {
status_t threadResult;
wait_for_thread(impl->fDataThread, &threadResult);
for (auto& request: impl->fControlQueue) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
}
} else {
throw BRuntimeError(
__PRETTY_FUNCTION__, "Unknown reason that the controlQueueSem is deleted");
}
return B_OK;
}
static constexpr uint16 EVENT_CANCELLED = 0x4000;
status_t
BHttpSession::Impl::DataThreadFunc(void* arg)
{
BHttpSession::Impl* data = static_cast<BHttpSession::Impl*>(arg);
data->objectList.push_back(
object_wait_info{data->fDataQueueSem, B_OBJECT_TYPE_SEMAPHORE, B_EVENT_ACQUIRE_SEMAPHORE});
while (true) {
if (auto status = wait_for_objects(data->objectList.data(), data->objectList.size());
status == B_INTERRUPTED)
continue;
else if (status < 0) {
throw BSystemError("wait_for_objects()", status);
}
if (data->objectList[0].events == B_EVENT_ACQUIRE_SEMAPHORE) {
if (auto status = acquire_sem(data->fDataQueueSem); status == B_INTERRUPTED)
continue;
else if (status != B_OK) {
break;
}
data->fLock.Lock();
while (!data->fDataQueue.empty()) {
auto request = std::move(data->fDataQueue.front());
data->fDataQueue.pop_front();
auto socket = request.Socket();
data->connectionMap.insert(std::make_pair(socket, std::move(request)));
data->objectList.push_back(
object_wait_info{socket, B_OBJECT_TYPE_FD, B_EVENT_WRITE});
}
for (auto id: data->fCancelList) {
size_t offset = 0;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend();
it++) {
offset++;
if (it->second.Id() == id) {
data->objectList[offset].events = EVENT_CANCELLED;
break;
}
}
}
data->fCancelList.clear();
data->fLock.Unlock();
} else if ((data->objectList[0].events & B_EVENT_INVALID) == B_EVENT_INVALID) {
break;
}
bool resizeObjectList = false;
for (auto& item: data->objectList) {
if (item.type != B_OBJECT_TYPE_FD)
continue;
if ((item.events & B_EVENT_WRITE) == B_EVENT_WRITE) {
auto& request = data->connectionMap.find(item.object)->second;
auto error = false;
try {
request.TransferRequest();
} catch (...) {
request.SetError(std::current_exception());
error = true;
}
if (error) {
request.Disconnect();
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_READ) == B_EVENT_READ) {
auto& request = data->connectionMap.find(item.object)->second;
auto finished = false;
try {
if (request.CanCancel())
finished = true;
else
finished = request.ReceiveResult();
} catch (const Redirect& r) {
auto lock = AutoLocker<BLocker>(data->fLock);
data->fControlQueue.emplace_back(request, r);
release_sem(data->fControlQueueSem);
finished = true;
} catch (...) {
request.SetError(std::current_exception());
finished = true;
}
if (finished) {
request.Disconnect();
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
}
} else if ((item.events & B_EVENT_DISCONNECTED) == B_EVENT_DISCONNECTED) {
auto& request = data->connectionMap.find(item.object)->second;
try {
throw BNetworkRequestError(
__PRETTY_FUNCTION__, BNetworkRequestError::NetworkError);
} catch (...) {
request.SetError(std::current_exception());
}
data->connectionMap.erase(item.object);
resizeObjectList = true;
} else if ((item.events & EVENT_CANCELLED) == EVENT_CANCELLED) {
auto& request = data->connectionMap.find(item.object)->second;
request.Disconnect();
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
request.SetError(std::current_exception());
}
data->connectionMap.erase(item.object);
release_sem(data->fControlQueueSem);
resizeObjectList = true;
} else if (item.events == 0) {
continue;
} else {
auto& request = data->connectionMap.find(item.object)->second;
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
msg.AddString(UrlEventData::DebugMessage, "Unexpected event; socket deleted?");
});
throw BRuntimeError(
__PRETTY_FUNCTION__, "Socket was deleted at an unexpected time");
}
}
data->objectList[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
if (resizeObjectList)
data->objectList.resize(data->connectionMap.size() + 1);
auto i = 1;
for (auto it = data->connectionMap.cbegin(); it != data->connectionMap.cend(); it++) {
data->objectList[i].object = it->first;
if (it->second.State() == Request::InitialState)
throw BRuntimeError(__PRETTY_FUNCTION__, "Invalid state of request");
else if (it->second.State() == Request::Connected)
data->objectList[i].events = B_EVENT_WRITE | B_EVENT_DISCONNECTED;
else
data->objectList[i].events = B_EVENT_READ | B_EVENT_DISCONNECTED;
i++;
}
}
if (data->fQuitting.load()) {
for (auto it = data->connectionMap.begin(); it != data->connectionMap.end(); it++) {
try {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::Canceled);
} catch (...) {
it->second.SetError(std::current_exception());
}
}
} else {
throw BRuntimeError(__PRETTY_FUNCTION__, "Unknown reason that the dataQueueSem is deleted");
}
return B_OK;
}
\brief Internal helper that filters the lists of requests to guard against the concurrent
requests limit.
This method will do the locking of the internal structure.
*/
std::vector<BHttpSession::Request>
BHttpSession::Impl::GetRequestsForControlThread()
{
std::vector<BHttpSession::Request> requests;
if (fConnectionCount.size() >= fMaxHosts.load()) {
for (auto it = fConnectionCount.begin(); it != fConnectionCount.end();) {
if (atomic_get(std::addressof(it->second)) == 0) {
it = fConnectionCount.erase(it);
} else {
it++;
}
}
}
auto lock = AutoLocker<BLocker>(fLock);
fControlQueue.remove_if([this, &requests](auto& request) {
auto host = request.GetHost();
auto it = fConnectionCount.find(host);
if (it != fConnectionCount.end()) {
if (static_cast<size_t>(atomic_get(std::addressof(it->second)))
>= fMaxConnectionsPerHost.load(std::memory_order_relaxed)) {
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
msg.AddString(UrlEventData::DebugMessage,
"Request is queued: too many active connections for host");
});
return false;
} else {
atomic_add(std::addressof(it->second), 1);
request.SetCounter(std::addressof(it->second));
}
} else {
if (fConnectionCount.size() == fMaxHosts.load()) {
request.SendMessage(UrlEvent::DebugMessage, [](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugWarning);
msg.AddString(UrlEventData::DebugMessage,
"Request is queued: maximum number of concurrent hosts");
});
return false;
}
auto [newIt, success] = fConnectionCount.insert({host, 1});
if (!success) {
throw BRuntimeError(__PRETTY_FUNCTION__, "Cannot insert into fConnectionCount");
}
request.SetCounter(std::addressof(newIt->second));
}
requests.emplace_back(std::move(request));
return true;
});
return requests;
}
BHttpSession::BHttpSession()
{
fImpl = std::make_shared<BHttpSession::Impl>();
}
BHttpSession::~BHttpSession() = default;
BHttpSession::BHttpSession(const BHttpSession&) noexcept = default;
BHttpSession& BHttpSession::operator=(const BHttpSession&) noexcept = default;
BHttpResult
BHttpSession::Execute(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
{
return fImpl->Execute(std::move(request), std::move(target), observer);
}
void
BHttpSession::Cancel(int32 identifier)
{
fImpl->Cancel(identifier);
}
void
BHttpSession::Cancel(const BHttpResult& request)
{
fImpl->Cancel(request.Identity());
}
void
BHttpSession::SetMaxConnectionsPerHost(size_t maxConnections)
{
fImpl->SetMaxConnectionsPerHost(maxConnections);
}
void
BHttpSession::SetMaxHosts(size_t maxConnections)
{
fImpl->SetMaxHosts(maxConnections);
}
BHttpSession::Request::Request(BHttpRequest&& request, BBorrow<BDataIO> target, BMessenger observer)
:
fRequest(std::move(request)),
fObserver(observer)
{
auto identifier = get_netservices_request_identifier();
fRemainingRedirects = fRequest.MaxRedirections();
fResult = std::make_shared<HttpResultPrivate>(identifier);
if (target.HasValue())
fResult->bodyTarget = std::move(target);
if (fRequest.Method() == BHttpMethod::Head)
fParser.SetNoContent();
}
BHttpSession::Request::Request(Request& original, const BHttpSession::Redirect& redirect)
:
fRequest(std::move(original.fRequest)),
fObserver(original.fObserver),
fResult(original.fResult)
{
fRequest.SetUrl(redirect.url);
if (redirect.redirectToGet
&& (fRequest.Method() != BHttpMethod::Head && fRequest.Method() != BHttpMethod::Get)) {
fRequest.SetMethod(BHttpMethod::Get);
fRequest.ClearRequestBody();
}
fRemainingRedirects = original.fRemainingRedirects--;
if (fRequest.Method() == BHttpMethod::Head)
fParser.SetNoContent();
}
\brief Helper that sets the error in the result to \a e and notifies the listeners.
*/
void
BHttpSession::Request::SetError(std::exception_ptr e)
{
fResult->SetError(e);
SendMessage(UrlEvent::DebugMessage, [&e](BMessage& msg) {
msg.AddUInt32(UrlEventData::DebugType, UrlEventData::DebugError);
try {
std::rethrow_exception(e);
} catch (BError& error) {
msg.AddString(UrlEventData::DebugMessage, error.DebugMessage());
} catch (std::exception& error) {
msg.AddString(UrlEventData::DebugMessage, error.what());
} catch (...) {
msg.AddString(UrlEventData::DebugMessage, "Unknown exception");
}
});
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, false); });
}
std::pair<BString, int>
BHttpSession::Request::GetHost() const
{
return {fRequest.Url().Host(), fRequest.Url().Port()};
}
void
BHttpSession::Request::SetCounter(int32* counter) noexcept
{
fConnectionCounter = std::unique_ptr<int32, CounterDeleter>(counter);
}
\brief Resolve the hostname for a request
*/
void
BHttpSession::Request::ResolveHostName()
{
int port;
if (fRequest.Url().HasPort())
port = fRequest.Url().Port();
else if (fRequest.Url().Protocol() == "https")
port = 443;
else
port = 80;
if (auto status = fRemoteAddress.SetTo(fRequest.Url().Host(), port); status != B_OK) {
throw BNetworkRequestError(
"BNetworkAddress::SetTo()", BNetworkRequestError::HostnameError, status);
}
SendMessage(UrlEvent::HostNameResolved,
[this](BMessage& msg) { msg.AddString(UrlEventData::HostName, fRequest.Url().Host()); });
}
\brief Open the connection and make the socket non-blocking after opening it
*/
void
BHttpSession::Request::OpenConnection()
{
if (fRequest.Url().Protocol() == "https") {
fSocket = std::make_unique<BSecureSocket>();
} else {
fSocket = std::make_unique<BSocket>();
}
fSocket->SetTimeout(fRequest.Timeout());
if (auto status = fSocket->Connect(fRemoteAddress); status != B_OK) {
throw BNetworkRequestError(
"BSocket::Connect()", BNetworkRequestError::NetworkError, status);
}
auto flags = fcntl(fSocket->Socket(), F_GETFL, 0);
if (flags == -1)
throw BRuntimeError("fcntl()", "Error getting socket flags");
if (fcntl(fSocket->Socket(), F_SETFL, flags | O_NONBLOCK) != 0)
throw BRuntimeError("fcntl()", "Error setting non-blocking flag on socket");
SendMessage(UrlEvent::ConnectionOpened);
fRequestStatus = Connected;
}
\brief Transfer data from the request to the socket.
\returns \c true if the request is complete, or false if there is more.
*/
void
BHttpSession::Request::TransferRequest()
{
if (fRequestStatus != Connected)
throw BRuntimeError(
__PRETTY_FUNCTION__, "Write request for object that is not in the Connected state");
if (!fSerializer.IsInitialized())
fSerializer.SetTo(fBuffer, fRequest);
auto currentBytesWritten = fSerializer.Serialize(fBuffer, fSocket.get());
if (currentBytesWritten > 0) {
SendMessage(UrlEvent::UploadProgress, [this](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, fSerializer.BodyBytesTransferred());
if (auto totalSize = fSerializer.BodyBytesTotal())
msg.AddInt64(UrlEventData::TotalBytes, totalSize.value());
});
}
if (fSerializer.Complete())
fRequestStatus = RequestSent;
}
\brief Transfer data from the socket and parse the result.
\returns \c true if the request is complete, or false if there is more.
*/
bool
BHttpSession::Request::ReceiveResult()
{
auto bytesRead = fBuffer.ReadFrom(fSocket.get());
if (bytesRead == B_WOULD_BLOCK || bytesRead == B_INTERRUPTED)
return false;
auto readEnd = bytesRead == 0;
switch (fParser.State()) {
case HttpInputStreamState::StatusLine:
{
if (fBuffer.RemainingBytes() == static_cast<size_t>(bytesRead)) {
SendMessage(UrlEvent::ResponseStarted);
}
if (fParser.ParseStatus(fBuffer, fStatus)) {
if (fRemainingRedirects > 0) {
switch (fStatus.StatusCode()) {
case BHttpStatusCode::MovedPermanently:
case BHttpStatusCode::TemporaryRedirect:
case BHttpStatusCode::PermanentRedirect:
if (!fRequest.RewindBody())
break;
[[fallthrough]];
case BHttpStatusCode::Found:
case BHttpStatusCode::SeeOther:
fMightRedirect = true;
break;
default:
break;
}
}
if ((fStatus.StatusClass() == BHttpStatusClass::ClientError
|| fStatus.StatusClass() == BHttpStatusClass::ServerError)
&& fRequest.StopOnError()) {
fRequestStatus = ContentReceived;
fResult->SetStatus(std::move(fStatus));
fResult->SetFields(BHttpFields());
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
return true;
}
if (!fMightRedirect) {
SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
});
fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
}
} else {
if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Response did not include a complete status line");
}
return false;
}
[[fallthrough]];
}
case HttpInputStreamState::Fields:
{
if (!fParser.ParseFields(fBuffer, fFields)) {
if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Response did not include a complete header section");
}
break;
}
if (fMightRedirect) {
auto redirectToGet = false;
switch (fStatus.StatusCode()) {
case BHttpStatusCode::Found:
case BHttpStatusCode::SeeOther:
redirectToGet = true;
[[fallthrough]];
case BHttpStatusCode::MovedPermanently:
case BHttpStatusCode::TemporaryRedirect:
case BHttpStatusCode::PermanentRedirect:
{
auto locationField = fFields.FindField("Location");
if (locationField == fFields.end()) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Redirect; the Location field must be present and cannot be found");
}
auto locationString = BString(
(*locationField).Value().data(), (*locationField).Value().size());
auto redirect = BHttpSession::Redirect{
BUrl(fRequest.Url(), locationString), redirectToGet};
if (!redirect.url.IsValid()) {
throw BNetworkRequestError(__PRETTY_FUNCTION__,
BNetworkRequestError::ProtocolError,
"Redirect; invalid URL in the Location field");
}
SendMessage(UrlEvent::HttpRedirect, [&locationString](BMessage& msg) {
msg.AddString(UrlEventData::HttpRedirectUrl, locationString);
});
throw redirect;
}
default:
SendMessage(UrlEvent::HttpStatus, [this](BMessage& msg) {
msg.AddInt16(UrlEventData::HttpStatusCode, fStatus.code);
});
fResult->SetStatus(BHttpStatus{fStatus.code, std::move(fStatus.text)});
break;
}
}
fResult->SetFields(std::move(fFields));
SendMessage(UrlEvent::HttpFields);
if (!fParser.HasContent()) {
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
fRequestStatus = ContentReceived;
return true;
}
[[fallthrough]];
}
case HttpInputStreamState::Body:
{
size_t bytesWrittenToBody;
bytesRead = fParser.ParseBody(
fBuffer,
[this, &bytesWrittenToBody](const std::byte* buffer, size_t size) {
bytesWrittenToBody = fResult->WriteToBody(buffer, size);
return bytesWrittenToBody;
},
readEnd);
SendMessage(UrlEvent::DownloadProgress, [this, bytesRead](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, bytesRead);
if (fParser.BodyBytesTotal())
msg.AddInt64(UrlEventData::TotalBytes, fParser.BodyBytesTotal().value());
});
if (bytesWrittenToBody > 0) {
SendMessage(UrlEvent::BytesWritten, [bytesWrittenToBody](BMessage& msg) {
msg.AddInt64(UrlEventData::NumBytes, bytesWrittenToBody);
});
}
if (fParser.Complete()) {
fResult->SetBody();
SendMessage(UrlEvent::RequestCompleted,
[](BMessage& msg) { msg.AddBool(UrlEventData::Success, true); });
fRequestStatus = ContentReceived;
return true;
} else if (readEnd) {
throw BNetworkRequestError(__PRETTY_FUNCTION__, BNetworkRequestError::ProtocolError,
"Unexpected end of data: more data was expected");
}
break;
}
default:
throw BRuntimeError(__PRETTY_FUNCTION__, "Not reachable");
}
return false;
}
\brief Disconnect the socket. Does not validate if it actually succeeded.
*/
void
BHttpSession::Request::Disconnect() noexcept
{
fSocket->Disconnect();
}
\brief Send a message to the observer, if one is present
\param what The code of the message to be sent
\param dataFunc Optional function that adds additional data to the message.
*/
void
BHttpSession::Request::SendMessage(uint32 what, std::function<void(BMessage&)> dataFunc) const
{
if (fObserver.IsValid()) {
BMessage msg(what);
msg.AddInt32(UrlEventData::Id, fResult->id);
if (dataFunc)
dataFunc(msg);
fObserver.SendMessage(&msg);
}
}
namespace BPrivate::Network::UrlEventData {
const char* HttpStatusCode = "url:httpstatuscode";
const char* SSLCertificate = "url:sslcertificate";
const char* SSLMessage = "url:sslmessage";
const char* HttpRedirectUrl = "url:httpredirecturl";
}