Initial attempt at threading of BeginRequest

This commit is contained in:
archshift 2015-05-08 23:55:32 -07:00
parent 619400fb5b
commit 24cc45c246

View File

@ -3,6 +3,8 @@
// Refer to the license.txt file included. // Refer to the license.txt file included.
#include <stdexcept> #include <stdexcept>
#include <thread>
#include <mutex>
#include <curl/curl.h> #include <curl/curl.h>
@ -32,15 +34,20 @@ enum RequestType : u32 {
}; };
enum RequestState : u32 { enum RequestState : u32 {
REQUEST_STATE_DEFAULT, // TODO
REQUEST_STATE_IN_PROGRESS = 5, REQUEST_STATE_IN_PROGRESS = 5,
REQUEST_STATE_READY = 7, REQUEST_STATE_READY = 7,
}; };
struct HttpContext { struct HttpContext {
std::mutex mutex;
RequestState state;
std::string url; std::string url;
RequestType req_type; RequestType req_type;
RequestState req_state;
curl_slist* request_hdrs; curl_slist* request_hdrs;
std::vector<u8> response_hdrs; std::vector<u8> response_hdrs;
std::vector<u8> response_data; std::vector<u8> response_data;
long response_code; long response_code;
@ -52,14 +59,24 @@ struct HttpContext {
} }
}; };
static std::unordered_map<ContextHandle, std::unique_ptr<HttpContext>> handle_map; static std::unordered_map<ContextHandle, std::unique_ptr<HttpContext>> context_map;
static ContextHandle next_handle; static ContextHandle next_handle;
static int writer_callback(u8* data, size_t size, size_t nmemb, std::vector<u8>* writer_data) { static int writer_hdrs(u8* data, size_t size, size_t nmemb, HttpContext* context) {
if (writer_data == nullptr) if (context == nullptr)
return 0; return 0;
writer_data->reserve(writer_data->size() + size * nmemb); std::lock_guard<std::mutex> lock(context->mutex);
writer_data->insert(writer_data->end(), data, data + size * nmemb); context->response_hdrs.reserve(context->response_hdrs.size() + size * nmemb);
context->response_hdrs.insert(context->response_hdrs.end(), data, data + size * nmemb);
return (int)(size * nmemb);
}
static int writer_data(u8* data, size_t size, size_t nmemb, HttpContext* context) {
if (context == nullptr)
return 0;
std::lock_guard<std::mutex> lock(context->mutex);
context->response_data.reserve(context->response_data.size() + size * nmemb);
context->response_data.insert(context->response_data.end(), data, data + size * nmemb);
return (int)(size * nmemb); return (int)(size * nmemb);
} }
@ -80,19 +97,16 @@ static void CreateContext(Service::Interface* self) {
std::string url(reinterpret_cast<const char*>(Memory::GetPointer(url_ptr)), url_len); std::string url(reinterpret_cast<const char*>(Memory::GetPointer(url_ptr)), url_len);
// This should never even happen in the first place with 64-bit handles, LOG_DEBUG(Service, "request url=%s req_type=%u",
while (handle_map.count(next_handle) != 0) {
++next_handle;
}
LOG_DEBUG(Service, "context url=%s req_type=%u",
url.c_str(), req_type); url.c_str(), req_type);
HttpContext* context = new HttpContext({}); // TODO: give HttpContext a proper constructor
std::unique_ptr<HttpContext> context(new HttpContext{});
context->req_type = req_type; context->req_type = req_type;
context->url = url; context->url = url;
handle_map.emplace(next_handle, std::unique_ptr<HttpContext>(context)); context_map.emplace(next_handle, std::move(context));
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
cmd_buff[2] = next_handle; cmd_buff[2] = next_handle;
@ -104,12 +118,13 @@ static void CloseContext(Service::Interface* self) {
u32* cmd_buff = Kernel::GetCommandBuffer(); u32* cmd_buff = Kernel::GetCommandBuffer();
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]); ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = handle_map.find(handle); // auto map_it = request_map.find(handle);
if (map_it == handle_map.end()) { // if (map_it == request_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle // cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; // return;
} // }
handle_map.erase(handle); // TODO: wait for thread to finish!
context_map.erase(handle);
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
} }
@ -119,14 +134,16 @@ static void GetRequestState(Service::Interface* self) {
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]); ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = handle_map.find(handle); auto map_it = context_map.find(handle);
if (map_it == handle_map.end()) { if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; return;
} }
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
cmd_buff[2] = map_it->second->req_state;
std::lock_guard<std::mutex> lock(map_it->second->mutex);
cmd_buff[2] = map_it->second->state;
} }
static void GetDownloadSizeState(Service::Interface* self) { static void GetDownloadSizeState(Service::Interface* self) {
@ -134,36 +151,31 @@ static void GetDownloadSizeState(Service::Interface* self) {
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]); ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = handle_map.find(handle); auto map_it = context_map.find(handle);
if (map_it == handle_map.end()) { if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; return;
} }
HttpContext* context = map_it->second.get(); HttpContext* response = map_it->second.get();
std::lock_guard<std::mutex> lock(response->mutex);
// TODO: Request not done yet
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
cmd_buff[2] = (u32)context->downloaded_size; cmd_buff[2] = (u32) response->downloaded_size;
// TODO: invalid content-length? // TODO: invalid content-length?
cmd_buff[3] = (u32)context->download_size; cmd_buff[3] = (u32) response->download_size;
return; return;
} }
static void BeginRequest(Service::Interface* self) { static void MakeRequest(HttpContext* context) {
u32* cmd_buff = Kernel::GetCommandBuffer();
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = handle_map.find(handle);
if (map_it == handle_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return;
}
HttpContext* context = map_it->second.get();
CURL* connection = curl_easy_init(); CURL* connection = curl_easy_init();
CURLcode res; CURLcode res;
{
std::lock_guard<std::mutex> lock(context->mutex);
context->state = REQUEST_STATE_IN_PROGRESS;
res = curl_easy_setopt(connection, CURLOPT_URL, context->url.c_str()); res = curl_easy_setopt(connection, CURLOPT_URL, context->url.c_str());
switch (context->req_type) { switch (context->req_type) {
case REQUEST_TYPE_GET: case REQUEST_TYPE_GET:
@ -176,27 +188,53 @@ static void BeginRequest(Service::Interface* self) {
case REQUEST_TYPE_PUT: case REQUEST_TYPE_PUT:
case REQUEST_TYPE_PUT_: case REQUEST_TYPE_PUT_:
res = curl_easy_setopt(connection, CURLOPT_UPLOAD, 1); res = curl_easy_setopt(connection, CURLOPT_UPLOAD, 1);
break;
case REQUEST_TYPE_DELETE: case REQUEST_TYPE_DELETE:
// TODO
break; break;
case REQUEST_TYPE_HEAD: case REQUEST_TYPE_HEAD:
res = curl_easy_setopt(connection, CURLOPT_NOBODY, 1); res = curl_easy_setopt(connection, CURLOPT_NOBODY, 1);
break; break;
} }
res = curl_easy_setopt(connection, CURLOPT_WRITEFUNCTION, writer_callback); res = curl_easy_setopt(connection, CURLOPT_HEADERFUNCTION, writer_hdrs);
res = curl_easy_setopt(connection, CURLOPT_WRITEHEADER, &context->response_hdrs); res = curl_easy_setopt(connection, CURLOPT_WRITEFUNCTION, writer_data);
res = curl_easy_setopt(connection, CURLOPT_WRITEDATA, &context->response_data); res = curl_easy_setopt(connection, CURLOPT_HEADERDATA, context);
res = curl_easy_setopt(connection, CURLOPT_WRITEDATA, context);
curl_easy_setopt(connection, CURLOPT_HTTPHEADER, context->request_hdrs); curl_easy_setopt(connection, CURLOPT_HTTPHEADER, context->request_hdrs);
}
context->req_state = REQUEST_STATE_IN_PROGRESS;
res = curl_easy_perform(connection); res = curl_easy_perform(connection);
context->req_state = REQUEST_STATE_READY; {
std::lock_guard<std::mutex> lock(context->mutex);
curl_easy_getinfo(connection, CURLINFO_RESPONSE_CODE, &context->response_code); curl_easy_getinfo(connection, CURLINFO_RESPONSE_CODE, &context->response_code);
curl_easy_getinfo(connection, CURLINFO_SIZE_DOWNLOAD, &context->downloaded_size); curl_easy_getinfo(connection, CURLINFO_SIZE_DOWNLOAD, &context->downloaded_size);
curl_easy_getinfo(connection, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &context->download_size); curl_easy_getinfo(connection, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &context->download_size);
context->state = REQUEST_STATE_READY;
}
curl_easy_cleanup(connection);
}
static void BeginRequest(Service::Interface* self) {
u32* cmd_buff = Kernel::GetCommandBuffer();
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = context_map.find(handle);
if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return;
}
HttpContext* context = map_it->second.get();
std::thread req_thread(MakeRequest, context);
req_thread.detach();
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
} }
@ -207,12 +245,13 @@ static void ReceiveData(Service::Interface* self) {
u32 buf_size = cmd_buff[2]; u32 buf_size = cmd_buff[2];
u8* buffer = Memory::GetPointer(cmd_buff[4]); u8* buffer = Memory::GetPointer(cmd_buff[4]);
auto map_it = handle_map.find(handle); auto map_it = context_map.find(handle);
if (map_it == handle_map.end()) { if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; return;
} }
std::lock_guard<std::mutex> lock(map_it->second->mutex);
const std::vector<u8>& data = map_it->second->response_data; const std::vector<u8>& data = map_it->second->response_data;
if (buf_size < data.size()) { if (buf_size < data.size()) {
cmd_buff[1] = 0xd840a02b; cmd_buff[1] = 0xd840a02b;
@ -239,12 +278,13 @@ static void AddRequestHeader(Service::Interface* self) {
std::string(reinterpret_cast<const char*>(Memory::GetPointer(hdr_name_buf)), hdr_name_len).c_str(), std::string(reinterpret_cast<const char*>(Memory::GetPointer(hdr_name_buf)), hdr_name_len).c_str(),
std::string(reinterpret_cast<const char*>(Memory::GetPointer(hdr_val_buf)), hdr_val_len).c_str()); std::string(reinterpret_cast<const char*>(Memory::GetPointer(hdr_val_buf)), hdr_val_len).c_str());
auto map_it = handle_map.find(handle); auto map_it = context_map.find(handle);
if (map_it == handle_map.end()) { if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; return;
} }
std::lock_guard<std::mutex> lock(map_it->second->mutex);
map_it->second->request_hdrs = curl_slist_append(map_it->second->request_hdrs, header.c_str()); map_it->second->request_hdrs = curl_slist_append(map_it->second->request_hdrs, header.c_str());
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
} }
@ -254,12 +294,22 @@ static void GetResponseStatusCode(Service::Interface* self) {
ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]); ContextHandle handle = static_cast<ContextHandle>(cmd_buff[1]);
auto map_it = handle_map.find(handle); auto map_it = context_map.find(handle);
if (map_it == handle_map.end()) { if (map_it == context_map.end()) {
cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle cmd_buff[1] = -1; // TODO: Find proper result code for invalid handle
return; return;
} }
while (true) {
std::lock_guard<std::mutex> lock(map_it->second->mutex);
if (map_it->second->state == REQUEST_STATE_READY)
break;
}
std::lock_guard<std::mutex> lock(map_it->second->mutex);
// TODO: Request not done yet
cmd_buff[1] = RESULT_SUCCESS.raw; cmd_buff[1] = RESULT_SUCCESS.raw;
cmd_buff[2] = (u32)map_it->second->response_code; cmd_buff[2] = (u32)map_it->second->response_code;
} }
@ -267,7 +317,7 @@ static void GetResponseStatusCode(Service::Interface* self) {
static void Finalize(Service::Interface* self) { static void Finalize(Service::Interface* self) {
u32* cmd_buff = Kernel::GetCommandBuffer(); u32* cmd_buff = Kernel::GetCommandBuffer();
handle_map.clear(); context_map.clear();
next_handle = 0; next_handle = 0;
curl_global_cleanup(); curl_global_cleanup();