From 597a1071c446bdacf8b7e84508d3964feb5331f9 Mon Sep 17 00:00:00 2001 From: Arun Chandrasekaran Date: Fri, 13 Jan 2017 10:55:08 -0800 Subject: [PATCH] Added example to store large streaming uploads directly to filesystem using async server. --- libs/network/example/CMakeLists.txt | 14 + .../example/http/async_server_file_upload.cpp | 252 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 libs/network/example/http/async_server_file_upload.cpp diff --git a/libs/network/example/CMakeLists.txt b/libs/network/example/CMakeLists.txt index d2068e4bd..12530fa82 100644 --- a/libs/network/example/CMakeLists.txt +++ b/libs/network/example/CMakeLists.txt @@ -20,6 +20,8 @@ add_executable(trivial_google trivial_google.cpp) if (UNIX) add_executable(fileserver http/fileserver.cpp) + add_executable(async_server_file_upload http/async_server_file_upload.cpp) + add_dependencies(async_server_file_upload cppnetlib-server-parsers) endif (UNIX) add_dependencies(http_client cppnetlib-uri cppnetlib-client-connections) add_dependencies(simple_wget cppnetlib-uri cppnetlib-client-connections) @@ -136,6 +138,17 @@ if (UNIX) if (OPENSSL_FOUND) target_link_libraries(fileserver ${OPENSSL_LIBRARIES}) endif(OPENSSL_FOUND) + + target_link_libraries(async_server_file_upload + ${Boost_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + cppnetlib-server-parsers) + if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") + target_link_libraries(async_server_file_upload rt) + endif () + if (OPENSSL_FOUND) + target_link_libraries(async_server_file_upload ${OPENSSL_LIBRARIES}) + endif (OPENSSL_FOUND) endif (UNIX) set_target_properties(http_client PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example) @@ -152,4 +165,5 @@ endif (OPENSSL_FOUND) if (UNIX) set_target_properties(fileserver PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example) + set_target_properties(async_server_file_upload PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example) endif (UNIX) diff --git a/libs/network/example/http/async_server_file_upload.cpp b/libs/network/example/http/async_server_file_upload.cpp new file mode 100644 index 000000000..8224d7680 --- /dev/null +++ b/libs/network/example/http/async_server_file_upload.cpp @@ -0,0 +1,252 @@ +// +// Copyright 2017 (c) Arun Chandrasekaran +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +// +// Example for performing streaming file upload operations directly to +// filesystem using async server +// +// If you use wget, do the following at the client side: +// +// wget localhost:9190/upload?filename=Earth.mp4 +// --post-file=$HOME/Videos/Earth-From-Space.mp4 +// +#include +#include +#include +#include + +#include +#include +#include +#include + +struct connection_handler; + +typedef boost::network::http::server server; + +/// +/// Custom exception type +/// +struct file_uploader_exception : public std::runtime_error { + file_uploader_exception(const std::string err) : + std::runtime_error(err) { + } +}; + +/// +/// Encapsulates request & connection +/// +struct file_uploader : std::enable_shared_from_this { + const server::request& req; + server::connection_ptr conn; + + std::mutex mtx; + std::condition_variable condvar; + + FILE* fp = NULL; + +public: + file_uploader(const server::request& req, const server::connection_ptr& conn) + : req(req) + , conn(conn) { + const std::string dest = destination(req); + + if (dest.find("/upload") != std::string::npos) { + auto queries = get_queries(dest); + auto fname = queries.find("filename"); + if (fname != queries.end()) { + fp = ::fopen(fname->second.c_str(), "wb"); + if (!fp) { + throw file_uploader_exception("Failed to open file to write"); + } + } else { + throw file_uploader_exception("'filename' cannot be empty"); + } + } + } + + ~file_uploader() { + if (fp) { + ::fflush(fp); + ::fclose(fp); + } + } + + /// + /// Non blocking call to initiate the data transfer + /// + void async_recv() { + std::size_t content_length = 0; + auto const& headers = req.headers; + for (auto item : headers) { + if (boost::to_lower_copy(item.name) == "content-length") { + content_length = std::stoll(item.value); + break; + } + } + + read_chunk(conn, content_length); + } + + /// + /// The client shall wait by calling this until the transfer is done by + /// the IO threadpool + /// + void wait_for_completion() { + std::unique_lock _(mtx); + condvar.wait(_); + } + +private: + /// + /// Parses the string and gets the query as a key-value pair + /// + /// @param [in] dest String containing the path and the queries, without the fragment, + /// of the form "/path?key1=value1&key2=value2" + /// + std::map get_queries(const std::string dest) { + + std::size_t pos = dest.find_first_of("?"); + + std::map queries; + if (pos != std::string::npos) { + std::string query_string = dest.substr(pos + 1); + + // Replace '&' with space + for (pos = 0; pos < query_string.size(); pos++) { + if (query_string[pos] == '&') { + query_string[pos] = ' '; + } + } + + std::istringstream sin(query_string); + while (sin >> query_string) { + + pos = query_string.find_first_of("="); + + if (pos != std::string::npos) { + const std::string key = query_string.substr(0, pos); + const std::string value = query_string.substr(pos + 1); + queries[key] = value; + } + } + } + + return queries; + } + + /// + /// Reads a chunk of data + /// + /// @param [in] conn Connection to read from + /// @param [in] left2read Size to read + /// + void read_chunk(server::connection_ptr conn, std::size_t left2read) { + conn->read(boost::bind(&file_uploader::on_data_ready, + file_uploader::shared_from_this(), + _1, _2, _3, conn, left2read)); + } + + /// + /// Callback that gets called when the data is ready to be consumed + /// + void on_data_ready(server::connection::input_range range, + boost::system::error_code error, + std::size_t size, + server::connection_ptr conn, + std::size_t left2read) { + if (!error) { + ::fwrite(boost::begin(range), size, 1, fp); + std::size_t left = left2read - size; + if (left > 0) + read_chunk(conn, left); + else + wakeup(); + } + } + + /// + /// Wakesup the waiting thread + /// + void wakeup() { + std::unique_lock _(mtx); + condvar.notify_one(); + } +}; + +/// +/// Functor that gets executed whenever there is a packet on the HTTP port +/// +struct connection_handler { + /// + /// Gets executed whenever there is a packet on the HTTP port. + /// + /// @param [in] req Request object that holds the protobuf data + /// @param [in] conn Connection object + /// + void operator()(server::request const& req, const server::connection_ptr& conn) { + static std::map headers = { + {"Connection","close"}, + {"Content-Type", "text/plain"} + }; + + const std::string dest = destination(req); + + if (req.method == "POST" && dest.find("/upload") != std::string::npos) { + try { + auto start = std::chrono::high_resolution_clock::now(); + // Create a file uploader + std::shared_ptr uploader(new file_uploader(req, conn)); + // On success to create, start receiving the data + uploader->async_recv(); + // Wait until the data transfer is done by the IO threads + uploader->wait_for_completion(); + + // Respond to the client + conn->set_status(server::connection::ok); + conn->set_headers(headers); + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration diff = end - start; + std::ostringstream stm; + stm << "Took " << diff.count() << " milliseconds for the transfer." << std::endl; + conn->write(stm.str()); + } catch (const file_uploader_exception& e) { + conn->set_status(server::connection::bad_request); + conn->set_headers(headers); + const std::string err = e.what(); + conn->write(err); + } + } else { + conn->set_status(server::connection::bad_request); + conn->set_headers(headers); + conn->write("Only path allowed is /upload."); + } + } +}; + +int main(int ac, const char *av[]) +{ + if (ac != 2) { + std::cerr << "Usage: " << av[0] << " " << std::endl; + return EXIT_SUCCESS; + } + + // Create a connection handler + connection_handler handler; + + // Setup the async server + server local_server(server::options(handler) + .address("0.0.0.0") + .port(av[1]) + .reuse_address(true) + .thread_pool(std::make_shared(2))); + + // Start the server eventloop + local_server.run(); + + return EXIT_SUCCESS; +}