From 67fdec20726e48ba3a934cb25bb30d47ec4a4f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaroslav=20De=20La=20Pe=C3=B1a=20Smirnov?= Date: Wed, 29 Nov 2017 11:44:34 +0300 Subject: Initial commit, version 0.5.3 --- node_modules/uws/src/Asio.h | 184 +++++++++++ node_modules/uws/src/Backend.h | 15 + node_modules/uws/src/Epoll.cpp | 60 ++++ node_modules/uws/src/Epoll.h | 257 ++++++++++++++++ node_modules/uws/src/Extensions.cpp | 131 ++++++++ node_modules/uws/src/Extensions.h | 29 ++ node_modules/uws/src/Group.cpp | 263 ++++++++++++++++ node_modules/uws/src/Group.h | 144 +++++++++ node_modules/uws/src/HTTPSocket.cpp | 310 +++++++++++++++++++ node_modules/uws/src/HTTPSocket.h | 285 +++++++++++++++++ node_modules/uws/src/Hub.cpp | 177 +++++++++++ node_modules/uws/src/Hub.h | 97 ++++++ node_modules/uws/src/Libuv.h | 175 +++++++++++ node_modules/uws/src/Networking.cpp | 78 +++++ node_modules/uws/src/Networking.h | 259 ++++++++++++++++ node_modules/uws/src/Node.cpp | 83 +++++ node_modules/uws/src/Node.h | 198 ++++++++++++ node_modules/uws/src/Socket.cpp | 28 ++ node_modules/uws/src/Socket.h | 507 +++++++++++++++++++++++++++++++ node_modules/uws/src/WebSocket.cpp | 405 ++++++++++++++++++++++++ node_modules/uws/src/WebSocket.h | 89 ++++++ node_modules/uws/src/WebSocketProtocol.h | 377 +++++++++++++++++++++++ node_modules/uws/src/addon.cpp | 24 ++ node_modules/uws/src/addon.h | 464 ++++++++++++++++++++++++++++ node_modules/uws/src/http.h | 357 ++++++++++++++++++++++ node_modules/uws/src/uWS.h | 6 + 26 files changed, 5002 insertions(+) create mode 100644 node_modules/uws/src/Asio.h create mode 100644 node_modules/uws/src/Backend.h create mode 100644 node_modules/uws/src/Epoll.cpp create mode 100644 node_modules/uws/src/Epoll.h create mode 100644 node_modules/uws/src/Extensions.cpp create mode 100644 node_modules/uws/src/Extensions.h create mode 100644 node_modules/uws/src/Group.cpp create mode 100644 node_modules/uws/src/Group.h create mode 100644 node_modules/uws/src/HTTPSocket.cpp create mode 100644 node_modules/uws/src/HTTPSocket.h create mode 100644 node_modules/uws/src/Hub.cpp create mode 100644 node_modules/uws/src/Hub.h create mode 100644 node_modules/uws/src/Libuv.h create mode 100644 node_modules/uws/src/Networking.cpp create mode 100644 node_modules/uws/src/Networking.h create mode 100644 node_modules/uws/src/Node.cpp create mode 100644 node_modules/uws/src/Node.h create mode 100644 node_modules/uws/src/Socket.cpp create mode 100644 node_modules/uws/src/Socket.h create mode 100644 node_modules/uws/src/WebSocket.cpp create mode 100644 node_modules/uws/src/WebSocket.h create mode 100644 node_modules/uws/src/WebSocketProtocol.h create mode 100644 node_modules/uws/src/addon.cpp create mode 100644 node_modules/uws/src/addon.h create mode 100644 node_modules/uws/src/http.h create mode 100644 node_modules/uws/src/uWS.h (limited to 'node_modules/uws/src') diff --git a/node_modules/uws/src/Asio.h b/node_modules/uws/src/Asio.h new file mode 100644 index 0000000..2792c29 --- /dev/null +++ b/node_modules/uws/src/Asio.h @@ -0,0 +1,184 @@ +#ifndef ASIO_H +#define ASIO_H + +#include + +typedef boost::asio::ip::tcp::socket::native_type uv_os_sock_t; +static const int UV_READABLE = 1; +static const int UV_WRITABLE = 2; + +struct Loop : boost::asio::io_service { + + static Loop *createLoop(bool defaultLoop = true) { + return new Loop; + } + + void destroy() { + delete this; + } + + void run() { + boost::asio::io_service::run(); + } +}; + +struct Timer { + boost::asio::deadline_timer asio_timer; + void *data; + + Timer(Loop *loop) : asio_timer(*loop) { + + } + + void start(void (*cb)(Timer *), int first, int repeat) { + asio_timer.expires_from_now(boost::posix_time::milliseconds(first)); + asio_timer.async_wait([this, cb, repeat](const boost::system::error_code &ec) { + if (ec != boost::asio::error::operation_aborted) { + if (repeat) { + start(cb, repeat, repeat); + } + cb(this); + } + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } + + // bug: cancel does not cancel expired timers! + // it has to guarantee that the timer is not called after + // stop is called! ffs boost! + void stop() { + asio_timer.cancel(); + } + + void close() { + asio_timer.get_io_service().post([this]() { + delete this; + }); + } +}; + +struct Async { + Loop *loop; + void (*cb)(Async *); + void *data; + + boost::asio::io_service::work asio_work; + + Async(Loop *loop) : loop(loop), asio_work(*loop) { + } + + void start(void (*cb)(Async *)) { + this->cb = cb; + } + + void send() { + loop->post([this]() { + cb(this); + }); + } + + void close() { + loop->post([this]() { + delete this; + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } +}; + +struct Poll { + boost::asio::posix::stream_descriptor *socket; + void (*cb)(Poll *p, int status, int events); + + Poll(Loop *loop, uv_os_sock_t fd) { + socket = new boost::asio::posix::stream_descriptor(*loop, fd); + socket->non_blocking(true); + } + + bool isClosed() { + return !socket; + } + + boost::asio::ip::tcp::socket::native_type getFd() { + return socket ? socket->native_handle() : -1; + } + + void setCb(void (*cb)(Poll *p, int status, int events)) { + this->cb = cb; + } + + void (*getCb())(Poll *, int, int) { + return cb; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + delete socket; + socket = new boost::asio::posix::stream_descriptor(*loop, fd); + socket->non_blocking(true); + } + + void start(Loop *, Poll *self, int events) { + if (events & UV_READABLE) { + socket->async_read_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) { + if (ec != boost::asio::error::operation_aborted) { + self->start(nullptr, self, UV_READABLE); + self->cb(self, ec ? -1 : 0, UV_READABLE); + } + }); + } + + if (events & UV_WRITABLE) { + socket->async_write_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) { + if (ec != boost::asio::error::operation_aborted) { + self->start(nullptr, self, UV_WRITABLE); + self->cb(self, ec ? -1 : 0, UV_WRITABLE); + } + }); + } + } + + void change(Loop *, Poll *self, int events) { + socket->cancel(); + start(nullptr, self, events); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + return false; + } + + // todo: asio is thread safe, use it! + bool threadSafeChange(Loop *loop, Poll *self, int events) { + return false; + } + + void stop(Loop *) { + socket->cancel(); + } + + // this is not correct, but it works for now + // think about transfer - should allow one to not delete + // but in this case it doesn't matter at all + void close(Loop *loop, void (*cb)(Poll *)) { + socket->release(); + socket->get_io_service().post([cb, this]() { + cb(this); + }); + delete socket; + socket = nullptr; + } +}; + +#endif // ASIO_H diff --git a/node_modules/uws/src/Backend.h b/node_modules/uws/src/Backend.h new file mode 100644 index 0000000..4bfed95 --- /dev/null +++ b/node_modules/uws/src/Backend.h @@ -0,0 +1,15 @@ +#ifndef BACKEND_H +#define BACKEND_H + +// Default to Epoll if nothing specified and on Linux +// Default to Libuv if nothing specified and not on Linux +#ifdef USE_ASIO +#include "Asio.h" +#elif !defined(__linux__) || defined(USE_LIBUV) +#include "Libuv.h" +#else +#define USE_EPOLL +#include "Epoll.h" +#endif + +#endif // BACKEND_H diff --git a/node_modules/uws/src/Epoll.cpp b/node_modules/uws/src/Epoll.cpp new file mode 100644 index 0000000..f78d2ba --- /dev/null +++ b/node_modules/uws/src/Epoll.cpp @@ -0,0 +1,60 @@ +#include "Backend.h" + +#ifdef USE_EPOLL + +// todo: remove this mutex, have callbacks set at program start +std::recursive_mutex cbMutex; +void (*callbacks[16])(Poll *, int, int); +int cbHead = 0; + +void Loop::run() { + timepoint = std::chrono::system_clock::now(); + while (numPolls) { + for (std::pair c : closing) { + numPolls--; + + c.second(c.first); + + if (!numPolls) { + closing.clear(); + return; + } + } + closing.clear(); + + int numFdReady = epoll_wait(epfd, readyEvents, 1024, delay); + timepoint = std::chrono::system_clock::now(); + + if (preCb) { + preCb(preCbData); + } + + for (int i = 0; i < numFdReady; i++) { + Poll *poll = (Poll *) readyEvents[i].data.ptr; + int status = -bool(readyEvents[i].events & EPOLLERR); + callbacks[poll->state.cbIndex](poll, status, readyEvents[i].events); + } + + while (timers.size() && timers[0].timepoint < timepoint) { + Timer *timer = timers[0].timer; + cancelledLastTimer = false; + timers[0].cb(timers[0].timer); + + if (cancelledLastTimer) { + continue; + } + + int repeat = timers[0].nextDelay; + auto cb = timers[0].cb; + timers.erase(timers.begin()); + if (repeat) { + timer->start(cb, repeat, repeat); + } + } + + if (postCb) { + postCb(postCbData); + } + } +} +#endif diff --git a/node_modules/uws/src/Epoll.h b/node_modules/uws/src/Epoll.h new file mode 100644 index 0000000..949791f --- /dev/null +++ b/node_modules/uws/src/Epoll.h @@ -0,0 +1,257 @@ +#ifndef EPOLL_H +#define EPOLL_H + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef int uv_os_sock_t; +static const int UV_READABLE = EPOLLIN; +static const int UV_WRITABLE = EPOLLOUT; + +struct Poll; +struct Timer; + +extern std::recursive_mutex cbMutex; +extern void (*callbacks[16])(Poll *, int, int); +extern int cbHead; + +struct Timepoint { + void (*cb)(Timer *); + Timer *timer; + std::chrono::system_clock::time_point timepoint; + int nextDelay; +}; + +struct Loop { + int epfd; + int numPolls = 0; + bool cancelledLastTimer; + int delay = -1; + epoll_event readyEvents[1024]; + std::chrono::system_clock::time_point timepoint; + std::vector timers; + std::vector> closing; + + void (*preCb)(void *) = nullptr; + void (*postCb)(void *) = nullptr; + void *preCbData, *postCbData; + + Loop(bool defaultLoop) { + epfd = epoll_create1(EPOLL_CLOEXEC); + timepoint = std::chrono::system_clock::now(); + } + + static Loop *createLoop(bool defaultLoop = true) { + return new Loop(defaultLoop); + } + + void destroy() { + ::close(epfd); + delete this; + } + + void run(); + + int getEpollFd() { + return epfd; + } +}; + +struct Timer { + Loop *loop; + void *data; + + Timer(Loop *loop) { + this->loop = loop; + } + + void start(void (*cb)(Timer *), int timeout, int repeat) { + loop->timepoint = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point timepoint = loop->timepoint + std::chrono::milliseconds(timeout); + + Timepoint t = {cb, this, timepoint, repeat}; + loop->timers.insert( + std::upper_bound(loop->timers.begin(), loop->timers.end(), t, [](const Timepoint &a, const Timepoint &b) { + return a.timepoint < b.timepoint; + }), + t + ); + + loop->delay = -1; + if (loop->timers.size()) { + loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); + } + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } + + // always called before destructor + void stop() { + auto pos = loop->timers.begin(); + for (Timepoint &t : loop->timers) { + if (t.timer == this) { + loop->timers.erase(pos); + break; + } + pos++; + } + loop->cancelledLastTimer = true; + + loop->delay = -1; + if (loop->timers.size()) { + loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); + } + } + + void close() { + delete this; + } +}; + +// 4 bytes +struct Poll { +protected: + struct { + int fd : 28; + unsigned int cbIndex : 4; + } state = {-1, 0}; + + Poll(Loop *loop, uv_os_sock_t fd) { + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); + state.fd = fd; + loop->numPolls++; + } + + // todo: pre-set all of callbacks up front and remove mutex + void setCb(void (*cb)(Poll *p, int status, int events)) { + cbMutex.lock(); + state.cbIndex = cbHead; + for (int i = 0; i < cbHead; i++) { + if (callbacks[i] == cb) { + state.cbIndex = i; + break; + } + } + if (state.cbIndex == cbHead) { + callbacks[cbHead++] = cb; + } + cbMutex.unlock(); + } + + void (*getCb())(Poll *, int, int) { + return callbacks[state.cbIndex]; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + state.fd = fd; + loop->numPolls++; + } + + void start(Loop *loop, Poll *self, int events) { + epoll_event event; + event.events = events; + event.data.ptr = self; + epoll_ctl(loop->epfd, EPOLL_CTL_ADD, state.fd, &event); + } + + void change(Loop *loop, Poll *self, int events) { + epoll_event event; + event.events = events; + event.data.ptr = self; + epoll_ctl(loop->epfd, EPOLL_CTL_MOD, state.fd, &event); + } + + void stop(Loop *loop) { + epoll_event event; + epoll_ctl(loop->epfd, EPOLL_CTL_DEL, state.fd, &event); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + stop(loop); + start(newLoop, this, events); + loop->numPolls--; + // needs to lock the newLoop's numPolls! + newLoop->numPolls++; + return true; + } + + bool threadSafeChange(Loop *loop, Poll *self, int events) { + change(loop, self, events); + return true; + } + + void close(Loop *loop, void (*cb)(Poll *)) { + state.fd = -1; + loop->closing.push_back({this, cb}); + } + +public: + bool isClosed() { + return state.fd == -1; + } + + uv_os_sock_t getFd() { + return state.fd; + } + + friend struct Loop; +}; + +// this should be put in the Loop as a general "post" function always available +struct Async : Poll { + void (*cb)(Async *); + Loop *loop; + void *data; + + Async(Loop *loop) : Poll(loop, ::eventfd(0, EFD_CLOEXEC)) { + this->loop = loop; + } + + void start(void (*cb)(Async *)) { + this->cb = cb; + Poll::setCb([](Poll *p, int, int) { + uint64_t val; + if (::read(((Async *) p)->state.fd, &val, 8) == 8) { + ((Async *) p)->cb((Async *) p); + } + }); + Poll::start(loop, this, UV_READABLE); + } + + void send() { + uint64_t one = 1; + if (::write(state.fd, &one, 8) != 8) { + return; + } + } + + void close() { + Poll::stop(loop); + ::close(state.fd); + Poll::close(loop, [](Poll *p) { + delete p; + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } +}; + +#endif // EPOLL_H diff --git a/node_modules/uws/src/Extensions.cpp b/node_modules/uws/src/Extensions.cpp new file mode 100644 index 0000000..ef8f9da --- /dev/null +++ b/node_modules/uws/src/Extensions.cpp @@ -0,0 +1,131 @@ +#include "Extensions.h" + +namespace uWS { + +enum ExtensionTokens { + TOK_PERMESSAGE_DEFLATE = 1838, + TOK_SERVER_NO_CONTEXT_TAKEOVER = 2807, + TOK_CLIENT_NO_CONTEXT_TAKEOVER = 2783, + TOK_SERVER_MAX_WINDOW_BITS = 2372, + TOK_CLIENT_MAX_WINDOW_BITS = 2348 +}; + +class ExtensionsParser { +private: + int *lastInteger = nullptr; + +public: + bool perMessageDeflate = false; + bool serverNoContextTakeover = false; + bool clientNoContextTakeover = false; + int serverMaxWindowBits = 0; + int clientMaxWindowBits = 0; + + int getToken(const char *&in, const char *stop); + ExtensionsParser(const char *data, size_t length); +}; + +int ExtensionsParser::getToken(const char *&in, const char *stop) { + while (!isalnum(*in) && in != stop) { + in++; + } + + int hashedToken = 0; + while (isalnum(*in) || *in == '-' || *in == '_') { + if (isdigit(*in)) { + hashedToken = hashedToken * 10 - (*in - '0'); + } else { + hashedToken += *in; + } + in++; + } + return hashedToken; +} + +ExtensionsParser::ExtensionsParser(const char *data, size_t length) { + const char *stop = data + length; + int token = 1; + for (; token && token != TOK_PERMESSAGE_DEFLATE; token = getToken(data, stop)); + + perMessageDeflate = (token == TOK_PERMESSAGE_DEFLATE); + while ((token = getToken(data, stop))) { + switch (token) { + case TOK_PERMESSAGE_DEFLATE: + return; + case TOK_SERVER_NO_CONTEXT_TAKEOVER: + serverNoContextTakeover = true; + break; + case TOK_CLIENT_NO_CONTEXT_TAKEOVER: + clientNoContextTakeover = true; + break; + case TOK_SERVER_MAX_WINDOW_BITS: + serverMaxWindowBits = 1; + lastInteger = &serverMaxWindowBits; + break; + case TOK_CLIENT_MAX_WINDOW_BITS: + clientMaxWindowBits = 1; + lastInteger = &clientMaxWindowBits; + break; + default: + if (token < 0 && lastInteger) { + *lastInteger = -token; + } + break; + } + } +} + +template +ExtensionsNegotiator::ExtensionsNegotiator(int wantedOptions) { + options = wantedOptions; +} + +template +std::string ExtensionsNegotiator::generateOffer() { + std::string extensionsOffer; + if (options & Options::PERMESSAGE_DEFLATE) { + extensionsOffer += "permessage-deflate"; + + if (options & Options::CLIENT_NO_CONTEXT_TAKEOVER) { + extensionsOffer += "; client_no_context_takeover"; + } + + if (options & Options::SERVER_NO_CONTEXT_TAKEOVER) { + extensionsOffer += "; server_no_context_takeover"; + } + } + + return extensionsOffer; +} + +template +void ExtensionsNegotiator::readOffer(std::string offer) { + if (isServer) { + ExtensionsParser extensionsParser(offer.data(), offer.length()); + if ((options & PERMESSAGE_DEFLATE) && extensionsParser.perMessageDeflate) { + if (extensionsParser.clientNoContextTakeover || (options & CLIENT_NO_CONTEXT_TAKEOVER)) { + options |= CLIENT_NO_CONTEXT_TAKEOVER; + } + + if (extensionsParser.serverNoContextTakeover) { + options |= SERVER_NO_CONTEXT_TAKEOVER; + } else { + options &= ~SERVER_NO_CONTEXT_TAKEOVER; + } + } else { + options &= ~PERMESSAGE_DEFLATE; + } + } else { + // todo! + } +} + +template +int ExtensionsNegotiator::getNegotiatedOptions() { + return options; +} + +template class ExtensionsNegotiator; +template class ExtensionsNegotiator; + +} diff --git a/node_modules/uws/src/Extensions.h b/node_modules/uws/src/Extensions.h new file mode 100644 index 0000000..763b4d2 --- /dev/null +++ b/node_modules/uws/src/Extensions.h @@ -0,0 +1,29 @@ +#ifndef EXTENSIONS_UWS_H +#define EXTENSIONS_UWS_H + +#include + +namespace uWS { + +enum Options : unsigned int { + NO_OPTIONS = 0, + PERMESSAGE_DEFLATE = 1, + SERVER_NO_CONTEXT_TAKEOVER = 2, + CLIENT_NO_CONTEXT_TAKEOVER = 4, + NO_DELAY = 8 +}; + +template +class ExtensionsNegotiator { +private: + int options; +public: + ExtensionsNegotiator(int wantedOptions); + std::string generateOffer(); + void readOffer(std::string offer); + int getNegotiatedOptions(); +}; + +} + +#endif // EXTENSIONS_UWS_H diff --git a/node_modules/uws/src/Group.cpp b/node_modules/uws/src/Group.cpp new file mode 100644 index 0000000..028b1a0 --- /dev/null +++ b/node_modules/uws/src/Group.cpp @@ -0,0 +1,263 @@ +#include "Group.h" +#include "Hub.h" + +namespace uWS { + +template +void Group::setUserData(void *user) { + this->userData = user; +} + +template +void *Group::getUserData() { + return userData; +} + +template +void Group::timerCallback(Timer *timer) { + Group *group = (Group *) timer->getData(); + + group->forEach([](uWS::WebSocket *webSocket) { + if (webSocket->hasOutstandingPong) { + webSocket->terminate(); + } else { + webSocket->hasOutstandingPong = true; + } + }); + + if (group->userPingMessage.length()) { + group->broadcast(group->userPingMessage.data(), group->userPingMessage.length(), OpCode::TEXT); + } else { + group->broadcast(nullptr, 0, OpCode::PING); + } +} + +template +void Group::startAutoPing(int intervalMs, std::string userMessage) { + timer = new Timer(loop); + timer->setData(this); + timer->start(timerCallback, intervalMs, intervalMs); + userPingMessage = userMessage; +} + +template +void Group::addHttpSocket(HttpSocket *httpSocket) { + if (httpSocketHead) { + httpSocketHead->prev = httpSocket; + httpSocket->next = httpSocketHead; + } else { + httpSocket->next = nullptr; + // start timer + httpTimer = new Timer(hub->getLoop()); + httpTimer->setData(this); + httpTimer->start([](Timer *httpTimer) { + Group *group = (Group *) httpTimer->getData(); + group->forEachHttpSocket([](HttpSocket *httpSocket) { + if (httpSocket->missedDeadline) { + httpSocket->terminate(); + } else if (!httpSocket->outstandingResponsesHead) { + httpSocket->missedDeadline = true; + } + }); + }, 1000, 1000); + } + httpSocketHead = httpSocket; + httpSocket->prev = nullptr; +} + +template +void Group::removeHttpSocket(HttpSocket *httpSocket) { + if (iterators.size()) { + iterators.top() = httpSocket->next; + } + if (httpSocket->prev == httpSocket->next) { + httpSocketHead = nullptr; + httpTimer->stop(); + httpTimer->close(); + } else { + if (httpSocket->prev) { + ((HttpSocket *) httpSocket->prev)->next = httpSocket->next; + } else { + httpSocketHead = (HttpSocket *) httpSocket->next; + } + if (httpSocket->next) { + ((HttpSocket *) httpSocket->next)->prev = httpSocket->prev; + } + } +} + +template +void Group::addWebSocket(WebSocket *webSocket) { + if (webSocketHead) { + webSocketHead->prev = webSocket; + webSocket->next = webSocketHead; + } else { + webSocket->next = nullptr; + } + webSocketHead = webSocket; + webSocket->prev = nullptr; +} + +template +void Group::removeWebSocket(WebSocket *webSocket) { + if (iterators.size()) { + iterators.top() = webSocket->next; + } + if (webSocket->prev == webSocket->next) { + webSocketHead = nullptr; + } else { + if (webSocket->prev) { + ((WebSocket *) webSocket->prev)->next = webSocket->next; + } else { + webSocketHead = (WebSocket *) webSocket->next; + } + if (webSocket->next) { + ((WebSocket *) webSocket->next)->prev = webSocket->prev; + } + } +} + +template +Group::Group(int extensionOptions, unsigned int maxPayload, Hub *hub, uS::NodeData *nodeData) : uS::NodeData(*nodeData), maxPayload(maxPayload), hub(hub), extensionOptions(extensionOptions) { + connectionHandler = [](WebSocket *, HttpRequest) {}; + transferHandler = [](WebSocket *) {}; + messageHandler = [](WebSocket *, char *, size_t, OpCode) {}; + disconnectionHandler = [](WebSocket *, int, char *, size_t) {}; + pingHandler = pongHandler = [](WebSocket *, char *, size_t) {}; + errorHandler = [](errorType) {}; + httpRequestHandler = [](HttpResponse *, HttpRequest, char *, size_t, size_t) {}; + httpConnectionHandler = [](HttpSocket *) {}; + httpDisconnectionHandler = [](HttpSocket *) {}; + httpCancelledRequestHandler = [](HttpResponse *) {}; + httpDataHandler = [](HttpResponse *, char *, size_t, size_t) {}; + + this->extensionOptions |= CLIENT_NO_CONTEXT_TAKEOVER | SERVER_NO_CONTEXT_TAKEOVER; +} + +template +void Group::stopListening() { + if (isServer) { + if (user) { + // todo: we should allow one group to listen to many ports! + uS::ListenSocket *listenSocket = (uS::ListenSocket *) user; + + if (listenSocket->timer) { + listenSocket->timer->stop(); + listenSocket->timer->close(); + } + + listenSocket->closeSocket(); + + // mark as stopped listening (extra care?) + user = nullptr; + } + } + + if (async) { + async->close(); + } +} + +template +void Group::onConnection(std::function *, HttpRequest)> handler) { + connectionHandler = handler; +} + +template +void Group::onTransfer(std::function *)> handler) { + transferHandler = handler; +} + +template +void Group::onMessage(std::function *, char *, size_t, OpCode)> handler) { + messageHandler = handler; +} + +template +void Group::onDisconnection(std::function *, int, char *, size_t)> handler) { + disconnectionHandler = handler; +} + +template +void Group::onPing(std::function *, char *, size_t)> handler) { + pingHandler = handler; +} + +template +void Group::onPong(std::function *, char *, size_t)> handler) { + pongHandler = handler; +} + +template +void Group::onError(std::function handler) { + errorHandler = handler; +} + +template +void Group::onHttpConnection(std::function *)> handler) { + httpConnectionHandler = handler; +} + +template +void Group::onHttpRequest(std::function handler) { + httpRequestHandler = handler; +} + +template +void Group::onHttpData(std::function handler) { + httpDataHandler = handler; +} + +template +void Group::onHttpDisconnection(std::function *)> handler) { + httpDisconnectionHandler = handler; +} + +template +void Group::onCancelledHttpRequest(std::function handler) { + httpCancelledRequestHandler = handler; +} + +template +void Group::onHttpUpgrade(std::function *, HttpRequest)> handler) { + httpUpgradeHandler = handler; +} + +template +void Group::broadcast(const char *message, size_t length, OpCode opCode) { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*asyncMutex); +#endif + + typename WebSocket::PreparedMessage *preparedMessage = WebSocket::prepareMessage((char *) message, length, opCode, false); + forEach([preparedMessage](uWS::WebSocket *ws) { + ws->sendPrepared(preparedMessage); + }); + WebSocket::finalizeMessage(preparedMessage); +} + +template +void Group::terminate() { + forEach([](uWS::WebSocket *ws) { + ws->terminate(); + }); + stopListening(); +} + +template +void Group::close(int code, char *message, size_t length) { + forEach([code, message, length](uWS::WebSocket *ws) { + ws->close(code, message, length); + }); + stopListening(); + if (timer) { + timer->stop(); + timer->close(); + } +} + +template struct Group; +template struct Group; + +} diff --git a/node_modules/uws/src/Group.h b/node_modules/uws/src/Group.h new file mode 100644 index 0000000..18c8c63 --- /dev/null +++ b/node_modules/uws/src/Group.h @@ -0,0 +1,144 @@ +#ifndef GROUP_UWS_H +#define GROUP_UWS_H + +#include "WebSocket.h" +#include "HTTPSocket.h" +#include "Extensions.h" +#include +#include + +namespace uWS { + +enum ListenOptions { + TRANSFERS +}; + +struct Hub; + +template +struct WIN32_EXPORT Group : private uS::NodeData { +protected: + friend struct Hub; + friend struct WebSocket; + friend struct HttpSocket; + friend struct HttpSocket; + + std::function *, HttpRequest)> connectionHandler; + std::function *)> transferHandler; + std::function *, char *message, size_t length, OpCode opCode)> messageHandler; + std::function *, int code, char *message, size_t length)> disconnectionHandler; + std::function *, char *, size_t)> pingHandler; + std::function *, char *, size_t)> pongHandler; + std::function *)> httpConnectionHandler; + std::function httpRequestHandler; + std::function httpDataHandler; + std::function httpCancelledRequestHandler; + std::function *)> httpDisconnectionHandler; + std::function *, HttpRequest)> httpUpgradeHandler; + + using errorType = typename std::conditional::type; + std::function errorHandler; + + unsigned int maxPayload; + Hub *hub; + int extensionOptions; + Timer *timer = nullptr, *httpTimer = nullptr; + std::string userPingMessage; + std::stack iterators; + + // todo: cannot be named user, collides with parent! + void *userData = nullptr; + static void timerCallback(Timer *timer); + + WebSocket *webSocketHead = nullptr; + HttpSocket *httpSocketHead = nullptr; + + void addWebSocket(WebSocket *webSocket); + void removeWebSocket(WebSocket *webSocket); + + // todo: remove these, template + void addHttpSocket(HttpSocket *httpSocket); + void removeHttpSocket(HttpSocket *httpSocket); + + Group(int extensionOptions, unsigned int maxPayload, Hub *hub, uS::NodeData *nodeData); + void stopListening(); + +public: + void onConnection(std::function *, HttpRequest)> handler); + void onTransfer(std::function *)> handler); + void onMessage(std::function *, char *, size_t, OpCode)> handler); + void onDisconnection(std::function *, int code, char *message, size_t length)> handler); + void onPing(std::function *, char *, size_t)> handler); + void onPong(std::function *, char *, size_t)> handler); + void onError(std::function handler); + void onHttpConnection(std::function *)> handler); + void onHttpRequest(std::function handler); + void onHttpData(std::function handler); + void onHttpDisconnection(std::function *)> handler); + void onCancelledHttpRequest(std::function handler); + void onHttpUpgrade(std::function *, HttpRequest)> handler); + + // Thread safe + void broadcast(const char *message, size_t length, OpCode opCode); + void setUserData(void *user); + void *getUserData(); + + // Not thread safe + void terminate(); + void close(int code = 1000, char *message = nullptr, size_t length = 0); + void startAutoPing(int intervalMs, std::string userMessage = ""); + + // same as listen(TRANSFERS), backwards compatible API for now + void addAsync() { + if (!async) { + NodeData::addAsync(); + } + } + + void listen(ListenOptions listenOptions) { + if (listenOptions == TRANSFERS && !async) { + addAsync(); + } + } + + template + void forEach(const F &cb) { + Poll *iterator = webSocketHead; + iterators.push(iterator); + while (iterator) { + Poll *lastIterator = iterator; + cb((WebSocket *) iterator); + iterator = iterators.top(); + if (lastIterator == iterator) { + iterator = ((uS::Socket *) iterator)->next; + iterators.top() = iterator; + } + } + iterators.pop(); + } + + // duplicated code for now! + template + void forEachHttpSocket(const F &cb) { + Poll *iterator = httpSocketHead; + iterators.push(iterator); + while (iterator) { + Poll *lastIterator = iterator; + cb((HttpSocket *) iterator); + iterator = iterators.top(); + if (lastIterator == iterator) { + iterator = ((uS::Socket *) iterator)->next; + iterators.top() = iterator; + } + } + iterators.pop(); + } + + static Group *from(uS::Socket *s) { + return static_cast *>(s->getNodeData()); + } +}; + +} + +#endif // GROUP_UWS_H diff --git a/node_modules/uws/src/HTTPSocket.cpp b/node_modules/uws/src/HTTPSocket.cpp new file mode 100644 index 0000000..84e30b2 --- /dev/null +++ b/node_modules/uws/src/HTTPSocket.cpp @@ -0,0 +1,310 @@ +#include "HTTPSocket.h" +#include "Group.h" +#include "Extensions.h" +#include + +#define MAX_HEADERS 100 +#define MAX_HEADER_BUFFER_SIZE 4096 +#define FORCE_SLOW_PATH false + +namespace uWS { + +// UNSAFETY NOTE: assumes *end == '\r' (might unref end pointer) +char *getHeaders(char *buffer, char *end, Header *headers, size_t maxHeaders) { + for (unsigned int i = 0; i < maxHeaders; i++) { + for (headers->key = buffer; (*buffer != ':') & (*buffer > 32); *(buffer++) |= 32); + if (*buffer == '\r') { + if ((buffer != end) & (buffer[1] == '\n') & (i > 0)) { + headers->key = nullptr; + return buffer + 2; + } else { + return nullptr; + } + } else { + headers->keyLength = buffer - headers->key; + for (buffer++; (*buffer == ':' || *buffer < 33) && *buffer != '\r'; buffer++); + headers->value = buffer; + buffer = (char *) memchr(buffer, '\r', end - buffer); //for (; *buffer != '\r'; buffer++); + if (buffer /*!= end*/ && buffer[1] == '\n') { + headers->valueLength = buffer - headers->value; + buffer += 2; + headers++; + } else { + return nullptr; + } + } + } + return nullptr; +} + +// UNSAFETY NOTE: assumes 24 byte input length +static void base64(unsigned char *src, char *dst) { + static const char *b64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + for (int i = 0; i < 18; i += 3) { + *dst++ = b64[(src[i] >> 2) & 63]; + *dst++ = b64[((src[i] & 3) << 4) | ((src[i + 1] & 240) >> 4)]; + *dst++ = b64[((src[i + 1] & 15) << 2) | ((src[i + 2] & 192) >> 6)]; + *dst++ = b64[src[i + 2] & 63]; + } + *dst++ = b64[(src[18] >> 2) & 63]; + *dst++ = b64[((src[18] & 3) << 4) | ((src[19] & 240) >> 4)]; + *dst++ = b64[((src[19] & 15) << 2)]; + *dst++ = '='; +} + +template +uS::Socket *HttpSocket::onData(uS::Socket *s, char *data, size_t length) { + HttpSocket *httpSocket = (HttpSocket *) s; + + httpSocket->cork(true); + + if (httpSocket->contentLength) { + httpSocket->missedDeadline = false; + if (httpSocket->contentLength >= length) { + Group::from(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, length, httpSocket->contentLength -= length); + return httpSocket; + } else { + Group::from(httpSocket)->httpDataHandler(httpSocket->outstandingResponsesTail, data, httpSocket->contentLength, 0); + data += httpSocket->contentLength; + length -= httpSocket->contentLength; + httpSocket->contentLength = 0; + } + } + + if (FORCE_SLOW_PATH || httpSocket->httpBuffer.length()) { + if (httpSocket->httpBuffer.length() + length > MAX_HEADER_BUFFER_SIZE) { + httpSocket->onEnd(httpSocket); + return httpSocket; + } + + httpSocket->httpBuffer.reserve(httpSocket->httpBuffer.length() + length + WebSocketProtocol>::CONSUME_POST_PADDING); + httpSocket->httpBuffer.append(data, length); + data = (char *) httpSocket->httpBuffer.data(); + length = httpSocket->httpBuffer.length(); + } + + char *end = data + length; + char *cursor = data; + *end = '\r'; + Header headers[MAX_HEADERS]; + do { + char *lastCursor = cursor; + if ((cursor = getHeaders(cursor, end, headers, MAX_HEADERS))) { + HttpRequest req(headers); + + if (isServer) { + headers->valueLength = std::max(0, headers->valueLength - 9); + httpSocket->missedDeadline = false; + if (req.getHeader("upgrade", 7)) { + if (Group::from(httpSocket)->httpUpgradeHandler) { + Group::from(httpSocket)->httpUpgradeHandler((HttpSocket *) httpSocket, req); + } else { + Header secKey = req.getHeader("sec-websocket-key", 17); + Header extensions = req.getHeader("sec-websocket-extensions", 24); + Header subprotocol = req.getHeader("sec-websocket-protocol", 22); + if (secKey.valueLength == 24) { + bool perMessageDeflate; + httpSocket->upgrade(secKey.value, extensions.value, extensions.valueLength, + subprotocol.value, subprotocol.valueLength, &perMessageDeflate); + Group::from(httpSocket)->removeHttpSocket(httpSocket); + + // Warning: changes socket, needs to inform the stack of Poll address change! + WebSocket *webSocket = new WebSocket(perMessageDeflate, httpSocket); + webSocket->template setState>(); + webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE)); + Group::from(webSocket)->addWebSocket(webSocket); + + webSocket->cork(true); + Group::from(webSocket)->connectionHandler(webSocket, req); + // todo: should not uncork if closed! + webSocket->cork(false); + delete httpSocket; + + return webSocket; + } else { + httpSocket->onEnd(httpSocket); + } + } + return httpSocket; + } else { + if (Group::from(httpSocket)->httpRequestHandler) { + + HttpResponse *res = HttpResponse::allocateResponse(httpSocket); + if (httpSocket->outstandingResponsesTail) { + httpSocket->outstandingResponsesTail->next = res; + } else { + httpSocket->outstandingResponsesHead = res; + } + httpSocket->outstandingResponsesTail = res; + + Header contentLength; + if (req.getMethod() != HttpMethod::METHOD_GET && (contentLength = req.getHeader("content-length", 14))) { + httpSocket->contentLength = atoi(contentLength.value); + size_t bytesToRead = std::min(httpSocket->contentLength, end - cursor); + Group::from(httpSocket)->httpRequestHandler(res, req, cursor, bytesToRead, httpSocket->contentLength -= bytesToRead); + cursor += bytesToRead; + } else { + Group::from(httpSocket)->httpRequestHandler(res, req, nullptr, 0, 0); + } + + if (httpSocket->isClosed() || httpSocket->isShuttingDown()) { + return httpSocket; + } + } else { + httpSocket->onEnd(httpSocket); + return httpSocket; + } + } + } else { + if (req.getHeader("upgrade", 7)) { + + // Warning: changes socket, needs to inform the stack of Poll address change! + WebSocket *webSocket = new WebSocket(false, httpSocket); + httpSocket->cancelTimeout(); + webSocket->setUserData(httpSocket->httpUser); + webSocket->template setState>(); + webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE)); + Group::from(webSocket)->addWebSocket(webSocket); + + webSocket->cork(true); + Group::from(webSocket)->connectionHandler(webSocket, req); + if (!(webSocket->isClosed() || webSocket->isShuttingDown())) { + WebSocketProtocol>::consume(cursor, end - cursor, webSocket); + } + webSocket->cork(false); + delete httpSocket; + + return webSocket; + } else { + httpSocket->onEnd(httpSocket); + } + return httpSocket; + } + } else { + if (!httpSocket->httpBuffer.length()) { + if (length > MAX_HEADER_BUFFER_SIZE) { + httpSocket->onEnd(httpSocket); + } else { + httpSocket->httpBuffer.append(lastCursor, end - lastCursor); + } + } + return httpSocket; + } + } while(cursor != end); + + httpSocket->cork(false); + httpSocket->httpBuffer.clear(); + + return httpSocket; +} + +// todo: make this into a transformer and make use of sendTransformed +template +void HttpSocket::upgrade(const char *secKey, const char *extensions, size_t extensionsLength, + const char *subprotocol, size_t subprotocolLength, bool *perMessageDeflate) { + + Queue::Message *messagePtr; + + if (isServer) { + *perMessageDeflate = false; + std::string extensionsResponse; + if (extensionsLength) { + Group *group = Group::from(this); + ExtensionsNegotiator extensionsNegotiator(group->extensionOptions); + extensionsNegotiator.readOffer(std::string(extensions, extensionsLength)); + extensionsResponse = extensionsNegotiator.generateOffer(); + if (extensionsNegotiator.getNegotiatedOptions() & PERMESSAGE_DEFLATE) { + *perMessageDeflate = true; + } + } + + unsigned char shaInput[] = "XXXXXXXXXXXXXXXXXXXXXXXX258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + memcpy(shaInput, secKey, 24); + unsigned char shaDigest[SHA_DIGEST_LENGTH]; + SHA1(shaInput, sizeof(shaInput) - 1, shaDigest); + + char upgradeBuffer[1024]; + memcpy(upgradeBuffer, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: ", 97); + base64(shaDigest, upgradeBuffer + 97); + memcpy(upgradeBuffer + 125, "\r\n", 2); + size_t upgradeResponseLength = 127; + if (extensionsResponse.length() && extensionsResponse.length() < 200) { + memcpy(upgradeBuffer + upgradeResponseLength, "Sec-WebSocket-Extensions: ", 26); + memcpy(upgradeBuffer + upgradeResponseLength + 26, extensionsResponse.data(), extensionsResponse.length()); + memcpy(upgradeBuffer + upgradeResponseLength + 26 + extensionsResponse.length(), "\r\n", 2); + upgradeResponseLength += 26 + extensionsResponse.length() + 2; + } + if (subprotocolLength && subprotocolLength < 200) { + memcpy(upgradeBuffer + upgradeResponseLength, "Sec-WebSocket-Protocol: ", 24); + memcpy(upgradeBuffer + upgradeResponseLength + 24, subprotocol, subprotocolLength); + memcpy(upgradeBuffer + upgradeResponseLength + 24 + subprotocolLength, "\r\n", 2); + upgradeResponseLength += 24 + subprotocolLength + 2; + } + static char stamp[] = "Sec-WebSocket-Version: 13\r\nWebSocket-Server: uWebSockets\r\n\r\n"; + memcpy(upgradeBuffer + upgradeResponseLength, stamp, sizeof(stamp) - 1); + upgradeResponseLength += sizeof(stamp) - 1; + + messagePtr = allocMessage(upgradeResponseLength, upgradeBuffer); + } else { + messagePtr = allocMessage(httpBuffer.length(), httpBuffer.data()); + httpBuffer.clear(); + } + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + freeMessage(messagePtr); + } else { + messagePtr->callback = nullptr; + } + } else { + freeMessage(messagePtr); + } +} + +template +void HttpSocket::onEnd(uS::Socket *s) { + HttpSocket *httpSocket = (HttpSocket *) s; + + if (!httpSocket->isShuttingDown()) { + if (isServer) { + Group::from(httpSocket)->removeHttpSocket(httpSocket); + Group::from(httpSocket)->httpDisconnectionHandler(httpSocket); + } + } else { + httpSocket->cancelTimeout(); + } + + httpSocket->template closeSocket>(); + + while (!httpSocket->messageQueue.empty()) { + Queue::Message *message = httpSocket->messageQueue.front(); + if (message->callback) { + message->callback(nullptr, message->callbackData, true, nullptr); + } + httpSocket->messageQueue.pop(); + } + + while (httpSocket->outstandingResponsesHead) { + Group::from(httpSocket)->httpCancelledRequestHandler(httpSocket->outstandingResponsesHead); + HttpResponse *next = httpSocket->outstandingResponsesHead->next; + delete httpSocket->outstandingResponsesHead; + httpSocket->outstandingResponsesHead = next; + } + + if (httpSocket->preAllocatedResponse) { + delete httpSocket->preAllocatedResponse; + } + + httpSocket->nodeData->clearPendingPollChanges(httpSocket); + + if (!isServer) { + httpSocket->cancelTimeout(); + Group::from(httpSocket)->errorHandler(httpSocket->httpUser); + } +} + +template struct HttpSocket; +template struct HttpSocket; + +} diff --git a/node_modules/uws/src/HTTPSocket.h b/node_modules/uws/src/HTTPSocket.h new file mode 100644 index 0000000..5cc7a7f --- /dev/null +++ b/node_modules/uws/src/HTTPSocket.h @@ -0,0 +1,285 @@ +#ifndef HTTPSOCKET_UWS_H +#define HTTPSOCKET_UWS_H + +#include "Socket.h" +#include +// #include + +namespace uWS { + +struct Header { + char *key, *value; + unsigned int keyLength, valueLength; + + operator bool() { + return key; + } + + // slow without string_view! + std::string toString() { + return std::string(value, valueLength); + } +}; + +enum HttpMethod { + METHOD_GET, + METHOD_POST, + METHOD_PUT, + METHOD_DELETE, + METHOD_PATCH, + METHOD_OPTIONS, + METHOD_HEAD, + METHOD_TRACE, + METHOD_CONNECT, + METHOD_INVALID +}; + +struct HttpRequest { + Header *headers; + Header getHeader(const char *key) { + return getHeader(key, strlen(key)); + } + + HttpRequest(Header *headers = nullptr) : headers(headers) {} + + Header getHeader(const char *key, size_t length) { + if (headers) { + for (Header *h = headers; *++h; ) { + if (h->keyLength == length && !strncmp(h->key, key, length)) { + return *h; + } + } + } + return {nullptr, nullptr, 0, 0}; + } + + Header getUrl() { + if (headers->key) { + return *headers; + } + return {nullptr, nullptr, 0, 0}; + } + + HttpMethod getMethod() { + if (!headers->key) { + return METHOD_INVALID; + } + switch (headers->keyLength) { + case 3: + if (!strncmp(headers->key, "get", 3)) { + return METHOD_GET; + } else if (!strncmp(headers->key, "put", 3)) { + return METHOD_PUT; + } + break; + case 4: + if (!strncmp(headers->key, "post", 4)) { + return METHOD_POST; + } else if (!strncmp(headers->key, "head", 4)) { + return METHOD_HEAD; + } + break; + case 5: + if (!strncmp(headers->key, "patch", 5)) { + return METHOD_PATCH; + } else if (!strncmp(headers->key, "trace", 5)) { + return METHOD_TRACE; + } + break; + case 6: + if (!strncmp(headers->key, "delete", 6)) { + return METHOD_DELETE; + } + break; + case 7: + if (!strncmp(headers->key, "options", 7)) { + return METHOD_OPTIONS; + } else if (!strncmp(headers->key, "connect", 7)) { + return METHOD_CONNECT; + } + break; + } + return METHOD_INVALID; + } +}; + +struct HttpResponse; + +template +struct WIN32_EXPORT HttpSocket : uS::Socket { + void *httpUser; // remove this later, setTimeout occupies user for now + HttpResponse *outstandingResponsesHead = nullptr; + HttpResponse *outstandingResponsesTail = nullptr; + HttpResponse *preAllocatedResponse = nullptr; + + std::string httpBuffer; + size_t contentLength = 0; + bool missedDeadline = false; + + HttpSocket(uS::Socket *socket) : uS::Socket(std::move(*socket)) {} + + void terminate() { + onEnd(this); + } + + void upgrade(const char *secKey, const char *extensions, + size_t extensionsLength, const char *subprotocol, + size_t subprotocolLength, bool *perMessageDeflate); + +private: + friend struct uS::Socket; + friend struct HttpResponse; + friend struct Hub; + static uS::Socket *onData(uS::Socket *s, char *data, size_t length); + static void onEnd(uS::Socket *s); +}; + +struct HttpResponse { + HttpSocket *httpSocket; + HttpResponse *next = nullptr; + void *userData = nullptr; + void *extraUserData = nullptr; + HttpSocket::Queue::Message *messageQueue = nullptr; + bool hasEnded = false; + bool hasHead = false; + + HttpResponse(HttpSocket *httpSocket) : httpSocket(httpSocket) { + + } + + template + static HttpResponse *allocateResponse(HttpSocket *httpSocket) { + if (httpSocket->preAllocatedResponse) { + HttpResponse *ret = httpSocket->preAllocatedResponse; + httpSocket->preAllocatedResponse = nullptr; + return ret; + } else { + return new HttpResponse((HttpSocket *) httpSocket); + } + } + + //template + void freeResponse(HttpSocket *httpData) { + if (httpData->preAllocatedResponse) { + delete this; + } else { + httpData->preAllocatedResponse = this; + } + } + + void write(const char *message, size_t length = 0, + void(*callback)(void *httpSocket, void *data, bool cancelled, void *reserved) = nullptr, + void *callbackData = nullptr) { + + struct NoopTransformer { + static size_t estimate(const char *data, size_t length) { + return length; + } + + static size_t transform(const char *src, char *dst, size_t length, int transformData) { + memcpy(dst, src, length); + return length; + } + }; + + httpSocket->sendTransformed(message, length, callback, callbackData, 0); + hasHead = true; + } + + // todo: maybe this function should have a fast path for 0 length? + void end(const char *message = nullptr, size_t length = 0, + void(*callback)(void *httpResponse, void *data, bool cancelled, void *reserved) = nullptr, + void *callbackData = nullptr) { + + struct TransformData { + bool hasHead; + } transformData = {hasHead}; + + struct HttpTransformer { + + // todo: this should get TransformData! + static size_t estimate(const char *data, size_t length) { + return length + 128; + } + + static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) { + // todo: sprintf is extremely slow + int offset = transformData.hasHead ? 0 : std::sprintf(dst, "HTTP/1.1 200 OK\r\nContent-Length: %u\r\n\r\n", (unsigned int) length); + memcpy(dst + offset, src, length); + return length + offset; + } + }; + + if (httpSocket->outstandingResponsesHead != this) { + HttpSocket::Queue::Message *messagePtr = httpSocket->allocMessage(HttpTransformer::estimate(message, length)); + messagePtr->length = HttpTransformer::transform(message, (char *) messagePtr->data, length, transformData); + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + messagePtr->nextMessage = messageQueue; + messageQueue = messagePtr; + hasEnded = true; + } else { + httpSocket->sendTransformed(message, length, callback, callbackData, transformData); + // move head as far as possible + HttpResponse *head = next; + while (head) { + // empty message queue + HttpSocket::Queue::Message *messagePtr = head->messageQueue; + while (messagePtr) { + HttpSocket::Queue::Message *nextMessage = messagePtr->nextMessage; + + bool wasTransferred; + if (httpSocket->write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + httpSocket->freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + httpSocket->freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, true, nullptr); + } + goto updateHead; + } + messagePtr = nextMessage; + } + // cannot go beyond unfinished responses + if (!head->hasEnded) { + break; + } else { + HttpResponse *next = head->next; + head->freeResponse(httpSocket); + head = next; + } + } + updateHead: + httpSocket->outstandingResponsesHead = head; + if (!head) { + httpSocket->outstandingResponsesTail = nullptr; + } + + freeResponse(httpSocket); + } + } + + void setUserData(void *userData) { + this->userData = userData; + } + + void *getUserData() { + return userData; + } + + HttpSocket *getHttpSocket() { + return httpSocket; + } +}; + +} + +#endif // HTTPSOCKET_UWS_H diff --git a/node_modules/uws/src/Hub.cpp b/node_modules/uws/src/Hub.cpp new file mode 100644 index 0000000..771c263 --- /dev/null +++ b/node_modules/uws/src/Hub.cpp @@ -0,0 +1,177 @@ +#include "Hub.h" +#include "HTTPSocket.h" +#include + +namespace uWS { + +char *Hub::inflate(char *data, size_t &length, size_t maxPayload) { + dynamicInflationBuffer.clear(); + + inflationStream.next_in = (Bytef *) data; + inflationStream.avail_in = length; + + int err; + do { + inflationStream.next_out = (Bytef *) inflationBuffer; + inflationStream.avail_out = LARGE_BUFFER_SIZE; + err = ::inflate(&inflationStream, Z_FINISH); + if (!inflationStream.avail_in) { + break; + } + + dynamicInflationBuffer.append(inflationBuffer, LARGE_BUFFER_SIZE - inflationStream.avail_out); + } while (err == Z_BUF_ERROR && dynamicInflationBuffer.length() <= maxPayload); + + inflateReset(&inflationStream); + + if ((err != Z_BUF_ERROR && err != Z_OK) || dynamicInflationBuffer.length() > maxPayload) { + length = 0; + return nullptr; + } + + if (dynamicInflationBuffer.length()) { + dynamicInflationBuffer.append(inflationBuffer, LARGE_BUFFER_SIZE - inflationStream.avail_out); + + length = dynamicInflationBuffer.length(); + return (char *) dynamicInflationBuffer.data(); + } + + length = LARGE_BUFFER_SIZE - inflationStream.avail_out; + return inflationBuffer; +} + +void Hub::onServerAccept(uS::Socket *s) { + HttpSocket *httpSocket = new HttpSocket(s); + delete s; + + httpSocket->setState>(); + httpSocket->start(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + httpSocket->setNoDelay(true); + Group::from(httpSocket)->addHttpSocket(httpSocket); + Group::from(httpSocket)->httpConnectionHandler(httpSocket); +} + +void Hub::onClientConnection(uS::Socket *s, bool error) { + HttpSocket *httpSocket = (HttpSocket *) s; + + if (error) { + httpSocket->onEnd(httpSocket); + } else { + httpSocket->setState>(); + httpSocket->change(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + httpSocket->setNoDelay(true); + httpSocket->upgrade(nullptr, nullptr, 0, nullptr, 0, nullptr); + } +} + +bool Hub::listen(const char *host, int port, uS::TLS::Context sslContext, int options, Group *eh) { + if (!eh) { + eh = (Group *) this; + } + + if (uS::Node::listen(host, port, sslContext, options, (uS::NodeData *) eh, nullptr)) { + eh->errorHandler(port); + return false; + } + return true; +} + +bool Hub::listen(int port, uS::TLS::Context sslContext, int options, Group *eh) { + return listen(nullptr, port, sslContext, options, eh); +} + +uS::Socket *allocateHttpSocket(uS::Socket *s) { + return (uS::Socket *) new HttpSocket(s); +} + +void Hub::connect(std::string uri, void *user, std::map extraHeaders, int timeoutMs, Group *eh) { + if (!eh) { + eh = (Group *) this; + } + + size_t offset = 0; + std::string protocol = uri.substr(offset, uri.find("://")), hostname, portStr, path; + if ((offset += protocol.length() + 3) < uri.length()) { + hostname = uri.substr(offset, uri.find_first_of(":/", offset) - offset); + + offset += hostname.length(); + if (uri[offset] == ':') { + offset++; + portStr = uri.substr(offset, uri.find("/", offset) - offset); + } + + offset += portStr.length(); + if (uri[offset] == '/') { + path = uri.substr(++offset); + } + } + + if (hostname.length()) { + int port = 80; + bool secure = false; + if (protocol == "wss") { + port = 443; + secure = true; + } else if (protocol != "ws") { + eh->errorHandler(user); + } + + if (portStr.length()) { + port = stoi(portStr); + } + + HttpSocket *httpSocket = (HttpSocket *) uS::Node::connect(hostname.c_str(), port, secure, eh); + if (httpSocket) { + // startTimeout occupies the user + httpSocket->startTimeout::onEnd>(timeoutMs); + httpSocket->httpUser = user; + + std::string randomKey = "x3JJHMbDL1EzLkh9GBhXDw=="; +// for (int i = 0; i < 22; i++) { +// randomKey[i] = rand() % +// } + + httpSocket->httpBuffer = "GET /" + path + " HTTP/1.1\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: " + randomKey + "\r\n" + "Host: " + hostname + "\r\n" + + "Sec-WebSocket-Version: 13\r\n"; + + for (std::pair header : extraHeaders) { + httpSocket->httpBuffer += header.first + ": " + header.second + "\r\n"; + } + + httpSocket->httpBuffer += "\r\n"; + } else { + eh->errorHandler(user); + } + } else { + eh->errorHandler(user); + } +} + +void Hub::upgrade(uv_os_sock_t fd, const char *secKey, SSL *ssl, const char *extensions, size_t extensionsLength, const char *subprotocol, size_t subprotocolLength, Group *serverGroup) { + if (!serverGroup) { + serverGroup = &getDefaultGroup(); + } + + uS::Socket s((uS::NodeData *) serverGroup, serverGroup->loop, fd, ssl); + s.setNoDelay(true); + + // todo: skip httpSocket -> it cannot fail anyways! + HttpSocket *httpSocket = new HttpSocket(&s); + httpSocket->setState>(); + httpSocket->change(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + bool perMessageDeflate; + httpSocket->upgrade(secKey, extensions, extensionsLength, subprotocol, subprotocolLength, &perMessageDeflate); + + WebSocket *webSocket = new WebSocket(perMessageDeflate, httpSocket); + delete httpSocket; + webSocket->setState>(); + webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE)); + serverGroup->addWebSocket(webSocket); + serverGroup->connectionHandler(webSocket, {}); +} + +} diff --git a/node_modules/uws/src/Hub.h b/node_modules/uws/src/Hub.h new file mode 100644 index 0000000..f879579 --- /dev/null +++ b/node_modules/uws/src/Hub.h @@ -0,0 +1,97 @@ +#ifndef HUB_UWS_H +#define HUB_UWS_H + +#include "Group.h" +#include "Node.h" +#include +#include +#include +#include + +namespace uWS { + +struct WIN32_EXPORT Hub : private uS::Node, public Group, public Group { +protected: + struct ConnectionData { + std::string path; + void *user; + Group *group; + }; + + z_stream inflationStream = {}; + char *inflationBuffer; + char *inflate(char *data, size_t &length, size_t maxPayload); + std::string dynamicInflationBuffer; + static const int LARGE_BUFFER_SIZE = 300 * 1024; + + static void onServerAccept(uS::Socket *s); + static void onClientConnection(uS::Socket *s, bool error); + +public: + template + Group *createGroup(int extensionOptions = 0, unsigned int maxPayload = 16777216) { + return new Group(extensionOptions, maxPayload, this, nodeData); + } + + template + Group &getDefaultGroup() { + return static_cast &>(*this); + } + + bool listen(int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group *eh = nullptr); + bool listen(const char *host, int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group *eh = nullptr); + void connect(std::string uri, void *user = nullptr, std::map extraHeaders = {}, int timeoutMs = 5000, Group *eh = nullptr); + void upgrade(uv_os_sock_t fd, const char *secKey, SSL *ssl, const char *extensions, size_t extensionsLength, const char *subprotocol, size_t subprotocolLength, Group *serverGroup = nullptr); + + Hub(int extensionOptions = 0, bool useDefaultLoop = false, unsigned int maxPayload = 16777216) : uS::Node(LARGE_BUFFER_SIZE, WebSocketProtocol>::CONSUME_PRE_PADDING, WebSocketProtocol>::CONSUME_POST_PADDING, useDefaultLoop), + Group(extensionOptions, maxPayload, this, nodeData), Group(0, maxPayload, this, nodeData) { + inflateInit2(&inflationStream, -15); + inflationBuffer = new char[LARGE_BUFFER_SIZE]; + +#ifdef UWS_THREADSAFE + getLoop()->preCbData = nodeData; + getLoop()->preCb = [](void *nodeData) { + static_cast(nodeData)->asyncMutex->lock(); + }; + + getLoop()->postCbData = nodeData; + getLoop()->postCb = [](void *nodeData) { + static_cast(nodeData)->asyncMutex->unlock(); + }; +#endif + } + + ~Hub() { + inflateEnd(&inflationStream); + delete [] inflationBuffer; + } + + using uS::Node::run; + using uS::Node::getLoop; + using Group::onConnection; + using Group::onConnection; + using Group::onTransfer; + using Group::onMessage; + using Group::onMessage; + using Group::onDisconnection; + using Group::onDisconnection; + using Group::onPing; + using Group::onPing; + using Group::onPong; + using Group::onPong; + using Group::onError; + using Group::onError; + using Group::onHttpRequest; + using Group::onHttpData; + using Group::onHttpConnection; + using Group::onHttpDisconnection; + using Group::onHttpUpgrade; + using Group::onCancelledHttpRequest; + + friend struct WebSocket; + friend struct WebSocket; +}; + +} + +#endif // HUB_UWS_H diff --git a/node_modules/uws/src/Libuv.h b/node_modules/uws/src/Libuv.h new file mode 100644 index 0000000..7a71c53 --- /dev/null +++ b/node_modules/uws/src/Libuv.h @@ -0,0 +1,175 @@ +#ifndef LIBUV_H +#define LIBUV_H + +#include +static_assert (UV_VERSION_MINOR >= 3, "µWebSockets requires libuv >=1.3.0"); + +struct Loop : uv_loop_t { + static Loop *createLoop(bool defaultLoop = true) { + if (defaultLoop) { + return (Loop *) uv_default_loop(); + } else { + return (Loop *) uv_loop_new(); + } + } + + void destroy() { + if (this != uv_default_loop()) { + uv_loop_delete(this); + } + } + + void run() { + uv_run(this, UV_RUN_DEFAULT); + } +}; + +struct Async { + uv_async_t uv_async; + + Async(Loop *loop) { + uv_async.loop = loop; + } + + void start(void (*cb)(Async *)) { + uv_async_init(uv_async.loop, &uv_async, (uv_async_cb) cb); + } + + void send() { + uv_async_send(&uv_async); + } + + void close() { + uv_close((uv_handle_t *) &uv_async, [](uv_handle_t *a) { + delete (Async *) a; + }); + } + + void setData(void *data) { + uv_async.data = data; + } + + void *getData() { + return uv_async.data; + } +}; + +struct Timer { + uv_timer_t uv_timer; + + Timer(Loop *loop) { + uv_timer_init(loop, &uv_timer); + } + + void start(void (*cb)(Timer *), int first, int repeat) { + uv_timer_start(&uv_timer, (uv_timer_cb) cb, first, repeat); + } + + void setData(void *data) { + uv_timer.data = data; + } + + void *getData() { + return uv_timer.data; + } + + void stop() { + uv_timer_stop(&uv_timer); + } + + void close() { + uv_close((uv_handle_t *) &uv_timer, [](uv_handle_t *t) { + delete (Timer *) t; + }); + } + +private: + ~Timer() { + + } +}; + +struct Poll { + uv_poll_t *uv_poll; + void (*cb)(Poll *p, int status, int events); + + Poll(Loop *loop, uv_os_sock_t fd) { + uv_poll = new uv_poll_t; + uv_poll_init_socket(loop, uv_poll, fd); + } + + Poll(Poll &&other) { + uv_poll = other.uv_poll; + cb = other.cb; + other.uv_poll = nullptr; + } + + Poll(const Poll &other) = delete; + + ~Poll() { + delete uv_poll; + } + + bool isClosed() { + return uv_is_closing((uv_handle_t *) uv_poll); + } + + uv_os_sock_t getFd() { +#ifdef _WIN32 + uv_os_sock_t fd; + uv_fileno((uv_handle_t *) uv_poll, (uv_os_fd_t *) &fd); + return fd; +#else + return uv_poll->io_watcher.fd; +#endif + } + + void setCb(void (*cb)(Poll *p, int status, int events)) { + this->cb = cb; + } + + void (*getCb())(Poll *, int, int) { + return cb; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + delete uv_poll; + uv_poll = new uv_poll_t; + uv_poll_init_socket(loop, uv_poll, fd); + } + + void start(Loop *, Poll *self, int events) { + uv_poll->data = self; + uv_poll_start(uv_poll, events, [](uv_poll_t *p, int status, int events) { + Poll *self = (Poll *) p->data; + self->cb(self, status, events); + }); + } + + void change(Loop *, Poll *self, int events) { + start(nullptr, self, events); + } + + void stop(Loop *loop) { + uv_poll_stop(uv_poll); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + return false; + } + + bool threadSafeChange(Loop *, Poll *self, int events) { + return false; + } + + void close(Loop *loop, void (*cb)(Poll *)) { + this->cb = (void(*)(Poll *, int, int)) cb; + uv_close((uv_handle_t *) uv_poll, [](uv_handle_t *p) { + Poll *poll = (Poll *) p->data; + void (*cb)(Poll *) = (void(*)(Poll *)) poll->cb; + cb(poll); + }); + } +}; + +#endif // LIBUV_H diff --git a/node_modules/uws/src/Networking.cpp b/node_modules/uws/src/Networking.cpp new file mode 100644 index 0000000..743f83b --- /dev/null +++ b/node_modules/uws/src/Networking.cpp @@ -0,0 +1,78 @@ +#include "Networking.h" + +namespace uS { + +namespace TLS { + +Context::Context(const Context &other) +{ + if (other.context) { + context = other.context; + SSL_CTX_up_ref(context); + } +} + +Context &Context::operator=(const Context &other) { + if (other.context) { + context = other.context; + SSL_CTX_up_ref(context); + } + return *this; +} + +Context::~Context() +{ + if (context) { + SSL_CTX_free(context); + } +} + +struct Init { + Init() {SSL_library_init();} + ~Init() {/*EVP_cleanup();*/} +} init; + +Context createContext(std::string certChainFileName, std::string keyFileName, std::string keyFilePassword) +{ + Context context(SSL_CTX_new(SSLv23_server_method())); + if (!context.context) { + return nullptr; + } + + if (keyFilePassword.length()) { + context.password.reset(new std::string(keyFilePassword)); + SSL_CTX_set_default_passwd_cb_userdata(context.context, context.password.get()); + SSL_CTX_set_default_passwd_cb(context.context, Context::passwordCallback); + } + + SSL_CTX_set_options(context.context, SSL_OP_NO_SSLv3); + + if (SSL_CTX_use_certificate_chain_file(context.context, certChainFileName.c_str()) != 1) { + return nullptr; + } else if (SSL_CTX_use_PrivateKey_file(context.context, keyFileName.c_str(), SSL_FILETYPE_PEM) != 1) { + return nullptr; + } + + return context; +} + +} + +#ifndef _WIN32 +struct Init { + Init() {signal(SIGPIPE, SIG_IGN);} +} init; +#endif + +#ifdef _WIN32 +#pragma comment(lib, "Ws2_32.lib") + +struct WindowsInit { + WSADATA wsaData; + WindowsInit() {WSAStartup(MAKEWORD(2, 2), &wsaData);} + ~WindowsInit() {WSACleanup();} +} windowsInit; + +#endif + +} diff --git a/node_modules/uws/src/Networking.h b/node_modules/uws/src/Networking.h new file mode 100644 index 0000000..7ae88a2 --- /dev/null +++ b/node_modules/uws/src/Networking.h @@ -0,0 +1,259 @@ +// the purpose of this header should be to provide SSL and networking wrapped in a common interface +// it should allow cross-platform networking and SSL and also easy usage of mTCP and similar tech + +#ifndef NETWORKING_UWS_H +#define NETWORKING_UWS_H + +#include +#if OPENSSL_VERSION_NUMBER < 0x10100000L +#define SSL_CTX_up_ref(x) x->references++ +#define SSL_up_ref(x) x->references++ +#endif + +#ifndef __linux +#define MSG_NOSIGNAL 0 +#else +#include +#endif + +#ifdef __APPLE__ +#include +#define htobe64(x) OSSwapHostToBigInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#endif + +#ifdef _WIN32 +#define NOMINMAX +#include +#include +#pragma comment(lib, "ws2_32.lib") +#define SHUT_WR SD_SEND +#ifdef __MINGW32__ +// Windows has always been tied to LE +#define htobe64(x) __builtin_bswap64(x) +#define be64toh(x) __builtin_bswap64(x) +#else +#define __thread __declspec(thread) +#define htobe64(x) htonll(x) +#define be64toh(x) ntohll(x) +#define pthread_t DWORD +#define pthread_self GetCurrentThreadId +#endif +#define WIN32_EXPORT __declspec(dllexport) + +inline void close(SOCKET fd) {closesocket(fd);} +inline int setsockopt(SOCKET fd, int level, int optname, const void *optval, socklen_t optlen) { + return setsockopt(fd, level, optname, (const char *) optval, optlen); +} + +inline SOCKET dup(SOCKET socket) { + WSAPROTOCOL_INFOW pi; + if (WSADuplicateSocketW(socket, GetCurrentProcessId(), &pi) == SOCKET_ERROR) { + return INVALID_SOCKET; + } + return WSASocketW(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, WSA_FLAG_OVERLAPPED); +} +#else +#include +#include +#include +#include +#include +#include +#include +#define SOCKET_ERROR -1 +#define INVALID_SOCKET -1 +#define WIN32_EXPORT +#endif + +#include "Backend.h" +#include +#include +#include +#include +#include +#include +#include + +namespace uS { + +// todo: mark sockets nonblocking in these functions +// todo: probably merge this Context with the TLS::Context for same interface for SSL and non-SSL! +struct Context { + +#ifdef USE_MTCP + mtcp_context *mctx; +#endif + + Context() { + // mtcp_create_context +#ifdef USE_MTCP + mctx = mtcp_create_context(0); // cpu index? +#endif + } + + ~Context() { +#ifdef USE_MTCP + mtcp_destroy_context(mctx); +#endif + } + + // returns INVALID_SOCKET on error + uv_os_sock_t acceptSocket(uv_os_sock_t fd) { + uv_os_sock_t acceptedFd; +#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK) + // Linux, FreeBSD + acceptedFd = accept4(fd, nullptr, nullptr, SOCK_CLOEXEC | SOCK_NONBLOCK); +#else + // Windows, OS X + acceptedFd = accept(fd, nullptr, nullptr); +#endif + +#ifdef __APPLE__ + if (acceptedFd != INVALID_SOCKET) { + int noSigpipe = 1; + setsockopt(acceptedFd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(int)); + } +#endif + return acceptedFd; + } + + // returns INVALID_SOCKET on error + uv_os_sock_t createSocket(int domain, int type, int protocol) { + int flags = 0; +#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK) + flags = SOCK_CLOEXEC | SOCK_NONBLOCK; +#endif + + uv_os_sock_t createdFd = socket(domain, type | flags, protocol); + +#ifdef __APPLE__ + if (createdFd != INVALID_SOCKET) { + int noSigpipe = 1; + setsockopt(createdFd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(int)); + } +#endif + + return createdFd; + } + + void closeSocket(uv_os_sock_t fd) { +#ifdef _WIN32 + closesocket(fd); +#else + close(fd); +#endif + } + + bool wouldBlock() { +#ifdef _WIN32 + return WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EWOULDBLOCK;// || errno == EAGAIN; +#endif + } +}; + +namespace TLS { + +class WIN32_EXPORT Context { +private: + SSL_CTX *context = nullptr; + std::shared_ptr password; + + static int passwordCallback(char *buf, int size, int rwflag, void *u) + { + std::string *password = (std::string *) u; + int length = std::min(size, password->length()); + memcpy(buf, password->data(), length); + buf[length] = '\0'; + return length; + } + +public: + friend Context WIN32_EXPORT createContext(std::string certChainFileName, std::string keyFileName, std::string keyFilePassword); + Context(SSL_CTX *context) : context(context) { + + } + + Context() = default; + Context(const Context &other); + Context &operator=(const Context &other); + ~Context(); + operator bool() { + return context; + } + + SSL_CTX *getNativeContext() { + return context; + } +}; + +Context WIN32_EXPORT createContext(std::string certChainFileName, std::string keyFileName, std::string keyFilePassword = std::string()); + +} + +struct Socket; + +// NodeData is like a Context, maybe merge them? +struct WIN32_EXPORT NodeData { + char *recvBufferMemoryBlock; + char *recvBuffer; + int recvLength; + Loop *loop; + uS::Context *netContext; + void *user = nullptr; + static const int preAllocMaxSize = 1024; + char **preAlloc; + SSL_CTX *clientContext; + + Async *async = nullptr; + pthread_t tid; + + std::recursive_mutex *asyncMutex; + std::vector transferQueue; + std::vector changePollQueue; + static void asyncCallback(Async *async); + + static int getMemoryBlockIndex(size_t length) { + return (length >> 4) + bool(length & 15); + } + + char *getSmallMemoryBlock(int index) { + if (preAlloc[index]) { + char *memory = preAlloc[index]; + preAlloc[index] = nullptr; + return memory; + } else { + return new char[index << 4]; + } + } + + void freeSmallMemoryBlock(char *memory, int index) { + if (!preAlloc[index]) { + preAlloc[index] = memory; + } else { + delete [] memory; + } + } + +public: + void addAsync() { + async = new Async(loop); + async->setData(this); + async->start(NodeData::asyncCallback); + } + + void clearPendingPollChanges(Poll *p) { + asyncMutex->lock(); + changePollQueue.erase( + std::remove(changePollQueue.begin(), changePollQueue.end(), p), + changePollQueue.end() + ); + asyncMutex->unlock(); + } +}; + +} + +#endif // NETWORKING_UWS_H diff --git a/node_modules/uws/src/Node.cpp b/node_modules/uws/src/Node.cpp new file mode 100644 index 0000000..cd20e79 --- /dev/null +++ b/node_modules/uws/src/Node.cpp @@ -0,0 +1,83 @@ +#include "Node.h" + +namespace uS { + +// this should be Node +void NodeData::asyncCallback(Async *async) +{ + NodeData *nodeData = (NodeData *) async->getData(); + + nodeData->asyncMutex->lock(); + for (Poll *p : nodeData->transferQueue) { + Socket *s = (Socket *) p; + TransferData *transferData = (TransferData *) s->getUserData(); + + s->reInit(nodeData->loop, transferData->fd); + s->setCb(transferData->pollCb); + s->start(nodeData->loop, s, s->setPoll(transferData->pollEvents)); + + s->nodeData = transferData->destination; + s->setUserData(transferData->userData); + auto *transferCb = transferData->transferCb; + + delete transferData; + transferCb(s); + } + + for (Poll *p : nodeData->changePollQueue) { + Socket *s = (Socket *) p; + s->change(s->nodeData->loop, s, s->getPoll()); + } + + nodeData->changePollQueue.clear(); + nodeData->transferQueue.clear(); + nodeData->asyncMutex->unlock(); +} + +Node::Node(int recvLength, int prePadding, int postPadding, bool useDefaultLoop) { + nodeData = new NodeData; + nodeData->recvBufferMemoryBlock = new char[recvLength]; + nodeData->recvBuffer = nodeData->recvBufferMemoryBlock + prePadding; + nodeData->recvLength = recvLength - prePadding - postPadding; + + nodeData->tid = pthread_self(); + loop = Loop::createLoop(useDefaultLoop); + + // each node has a context + nodeData->netContext = new Context(); + + nodeData->loop = loop; + nodeData->asyncMutex = &asyncMutex; + + int indices = NodeData::getMemoryBlockIndex(NodeData::preAllocMaxSize) + 1; + nodeData->preAlloc = new char*[indices]; + for (int i = 0; i < indices; i++) { + nodeData->preAlloc[i] = nullptr; + } + + nodeData->clientContext = SSL_CTX_new(SSLv23_client_method()); + SSL_CTX_set_options(nodeData->clientContext, SSL_OP_NO_SSLv3); +} + +void Node::run() { + nodeData->tid = pthread_self(); + loop->run(); +} + +Node::~Node() { + delete [] nodeData->recvBufferMemoryBlock; + SSL_CTX_free(nodeData->clientContext); + + int indices = NodeData::getMemoryBlockIndex(NodeData::preAllocMaxSize) + 1; + for (int i = 0; i < indices; i++) { + if (nodeData->preAlloc[i]) { + delete [] nodeData->preAlloc[i]; + } + } + delete [] nodeData->preAlloc; + delete nodeData->netContext; + delete nodeData; + loop->destroy(); +} + +} diff --git a/node_modules/uws/src/Node.h b/node_modules/uws/src/Node.h new file mode 100644 index 0000000..3c4d3be --- /dev/null +++ b/node_modules/uws/src/Node.h @@ -0,0 +1,198 @@ +#ifndef NODE_UWS_H +#define NODE_UWS_H + +#include "Socket.h" +#include +#include + +namespace uS { + +enum ListenOptions : int { + REUSE_PORT = 1, + ONLY_IPV4 = 2 +}; + +class WIN32_EXPORT Node { +private: + template + static void connect_cb(Poll *p, int status, int events) { + C((Socket *) p, status < 0); + } + + template + static void accept_poll_cb(Poll *p, int status, int events) { + ListenSocket *listenData = (ListenSocket *) p; + accept_cb(listenData); + } + + template + static void accept_timer_cb(Timer *p) { + ListenSocket *listenData = (ListenSocket *) p->getData(); + accept_cb(listenData); + } + + template + static void accept_cb(ListenSocket *listenSocket) { + uv_os_sock_t serverFd = listenSocket->getFd(); + Context *netContext = listenSocket->nodeData->netContext; + uv_os_sock_t clientFd = netContext->acceptSocket(serverFd); + if (clientFd == INVALID_SOCKET) { + /* + * If accept is failing, the pending connection won't be removed and the + * polling will cause the server to spin, using 100% cpu. Switch to a timer + * event instead to avoid this. + */ + if (!TIMER && !netContext->wouldBlock()) { + listenSocket->stop(listenSocket->nodeData->loop); + + listenSocket->timer = new Timer(listenSocket->nodeData->loop); + listenSocket->timer->setData(listenSocket); + listenSocket->timer->start(accept_timer_cb, 1000, 1000); + } + return; + } else if (TIMER) { + listenSocket->timer->stop(); + listenSocket->timer->close(); + listenSocket->timer = nullptr; + + listenSocket->setCb(accept_poll_cb); + listenSocket->start(listenSocket->nodeData->loop, listenSocket, UV_READABLE); + } + do { + SSL *ssl = nullptr; + if (listenSocket->sslContext) { + ssl = SSL_new(listenSocket->sslContext.getNativeContext()); + SSL_set_accept_state(ssl); + } + + Socket *socket = new Socket(listenSocket->nodeData, listenSocket->nodeData->loop, clientFd, ssl); + socket->setPoll(UV_READABLE); + A(socket); + } while ((clientFd = netContext->acceptSocket(serverFd)) != INVALID_SOCKET); + } + +protected: + Loop *loop; + NodeData *nodeData; + std::recursive_mutex asyncMutex; + +public: + Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false); + ~Node(); + void run(); + + Loop *getLoop() { + return loop; + } + + template + Socket *connect(const char *hostname, int port, bool secure, NodeData *nodeData) { + Context *netContext = nodeData->netContext; + + addrinfo hints, *result; + memset(&hints, 0, sizeof(addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + if (getaddrinfo(hostname, std::to_string(port).c_str(), &hints, &result) != 0) { + return nullptr; + } + + uv_os_sock_t fd = netContext->createSocket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (fd == INVALID_SOCKET) { + freeaddrinfo(result); + return nullptr; + } + + ::connect(fd, result->ai_addr, result->ai_addrlen); + freeaddrinfo(result); + + SSL *ssl = nullptr; + if (secure) { + ssl = SSL_new(nodeData->clientContext); + SSL_set_connect_state(ssl); + SSL_set_tlsext_host_name(ssl, hostname); + } + + Socket initialSocket(nodeData, getLoop(), fd, ssl); + uS::Socket *socket = I(&initialSocket); + + socket->setCb(connect_cb); + socket->start(loop, socket, socket->setPoll(UV_WRITABLE)); + return socket; + } + + // todo: hostname, backlog + template + bool listen(const char *host, int port, uS::TLS::Context sslContext, int options, uS::NodeData *nodeData, void *user) { + addrinfo hints, *result; + memset(&hints, 0, sizeof(addrinfo)); + + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + Context *netContext = nodeData->netContext; + + if (getaddrinfo(host, std::to_string(port).c_str(), &hints, &result)) { + return true; + } + + uv_os_sock_t listenFd = SOCKET_ERROR; + addrinfo *listenAddr; + if ((options & uS::ONLY_IPV4) == 0) { + for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) { + if (a->ai_family == AF_INET6) { + listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol); + listenAddr = a; + } + } + } + + for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) { + if (a->ai_family == AF_INET) { + listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol); + listenAddr = a; + } + } + + if (listenFd == SOCKET_ERROR) { + freeaddrinfo(result); + return true; + } + +#ifdef __linux +#ifdef SO_REUSEPORT + if (options & REUSE_PORT) { + int optval = 1; + setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + } +#endif +#endif + + int enabled = true; + setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)); + + if (bind(listenFd, listenAddr->ai_addr, listenAddr->ai_addrlen) || ::listen(listenFd, 512)) { + netContext->closeSocket(listenFd); + freeaddrinfo(result); + return true; + } + + ListenSocket *listenSocket = new ListenSocket(nodeData, loop, listenFd, nullptr); + listenSocket->sslContext = sslContext; + listenSocket->nodeData = nodeData; + + listenSocket->setCb(accept_poll_cb); + listenSocket->start(loop, listenSocket, UV_READABLE); + + // should be vector of listen data! one group can have many listeners! + nodeData->user = listenSocket; + + freeaddrinfo(result); + return false; + } +}; + +} + +#endif // NODE_UWS_H diff --git a/node_modules/uws/src/Socket.cpp b/node_modules/uws/src/Socket.cpp new file mode 100644 index 0000000..c35bbf8 --- /dev/null +++ b/node_modules/uws/src/Socket.cpp @@ -0,0 +1,28 @@ +#include "Socket.h" + +namespace uS { + +Socket::Address Socket::getAddress() +{ + uv_os_sock_t fd = getFd(); + + sockaddr_storage addr; + socklen_t addrLength = sizeof(addr); + if (getpeername(fd, (sockaddr *) &addr, &addrLength) == -1) { + return {0, "", ""}; + } + + static __thread char buf[INET6_ADDRSTRLEN]; + + if (addr.ss_family == AF_INET) { + sockaddr_in *ipv4 = (sockaddr_in *) &addr; + inet_ntop(AF_INET, &ipv4->sin_addr, buf, sizeof(buf)); + return {ntohs(ipv4->sin_port), buf, "IPv4"}; + } else { + sockaddr_in6 *ipv6 = (sockaddr_in6 *) &addr; + inet_ntop(AF_INET6, &ipv6->sin6_addr, buf, sizeof(buf)); + return {ntohs(ipv6->sin6_port), buf, "IPv6"}; + } +} + +} diff --git a/node_modules/uws/src/Socket.h b/node_modules/uws/src/Socket.h new file mode 100644 index 0000000..2179ff8 --- /dev/null +++ b/node_modules/uws/src/Socket.h @@ -0,0 +1,507 @@ +#ifndef SOCKET_UWS_H +#define SOCKET_UWS_H + +#include "Networking.h" + +namespace uS { + +struct TransferData { + // Connection state + uv_os_sock_t fd; + SSL *ssl; + + // Poll state + void (*pollCb)(Poll *, int, int); + int pollEvents; + + // User state + void *userData; + + // Destination + NodeData *destination; + void (*transferCb)(Poll *); +}; + +// perfectly 64 bytes (4 + 60) +struct WIN32_EXPORT Socket : Poll { +protected: + struct { + int poll : 4; + int shuttingDown : 4; + } state = {0, false}; + + SSL *ssl; + void *user = nullptr; + NodeData *nodeData; + + // this is not needed by HttpSocket! + struct Queue { + struct Message { + const char *data; + size_t length; + Message *nextMessage = nullptr; + void (*callback)(void *socket, void *data, bool cancelled, void *reserved) = nullptr; + void *callbackData = nullptr, *reserved = nullptr; + }; + + Message *head = nullptr, *tail = nullptr; + void pop() + { + Message *nextMessage; + if ((nextMessage = head->nextMessage)) { + delete [] (char *) head; + head = nextMessage; + } else { + delete [] (char *) head; + head = tail = nullptr; + } + } + + bool empty() {return head == nullptr;} + Message *front() {return head;} + + void push(Message *message) + { + message->nextMessage = nullptr; + if (tail) { + tail->nextMessage = message; + tail = message; + } else { + head = message; + tail = message; + } + } + } messageQueue; + + int getPoll() { + return state.poll; + } + + int setPoll(int poll) { + state.poll = poll; + return poll; + } + + void setShuttingDown(bool shuttingDown) { + state.shuttingDown = shuttingDown; + } + + void transfer(NodeData *nodeData, void (*cb)(Poll *)) { + // userData is invalid from now on till onTransfer + setUserData(new TransferData({getFd(), ssl, getCb(), getPoll(), getUserData(), nodeData, cb})); + stop(this->nodeData->loop); + close(this->nodeData->loop, [](Poll *p) { + Socket *s = (Socket *) p; + TransferData *transferData = (TransferData *) s->getUserData(); + + transferData->destination->asyncMutex->lock(); + bool wasEmpty = transferData->destination->transferQueue.empty(); + transferData->destination->transferQueue.push_back(s); + transferData->destination->asyncMutex->unlock(); + + if (wasEmpty) { + transferData->destination->async->send(); + } + }); + } + + void changePoll(Socket *socket) { + if (!threadSafeChange(nodeData->loop, this, socket->getPoll())) { + if (socket->nodeData->tid != pthread_self()) { + socket->nodeData->asyncMutex->lock(); + socket->nodeData->changePollQueue.push_back(socket); + socket->nodeData->asyncMutex->unlock(); + socket->nodeData->async->send(); + } else { + change(socket->nodeData->loop, socket, socket->getPoll()); + } + } + } + + // clears user data! + template + void startTimeout(int timeoutMs = 15000) { + Timer *timer = new Timer(nodeData->loop); + timer->setData(this); + timer->start([](Timer *timer) { + Socket *s = (Socket *) timer->getData(); + s->cancelTimeout(); + onTimeout(s); + }, timeoutMs, 0); + + user = timer; + } + + void cancelTimeout() { + Timer *timer = (Timer *) getUserData(); + if (timer) { + timer->stop(); + timer->close(); + user = nullptr; + } + } + + template + static void sslIoHandler(Poll *p, int status, int events) { + Socket *socket = (Socket *) p; + + if (status < 0) { + STATE::onEnd((Socket *) p); + return; + } + + if (!socket->messageQueue.empty() && ((events & UV_WRITABLE) || SSL_want(socket->ssl) == SSL_READING)) { + socket->cork(true); + while (true) { + Queue::Message *messagePtr = socket->messageQueue.front(); + int sent = SSL_write(socket->ssl, messagePtr->data, messagePtr->length); + if (sent == (ssize_t) messagePtr->length) { + if (messagePtr->callback) { + messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved); + } + socket->messageQueue.pop(); + if (socket->messageQueue.empty()) { + if ((socket->state.poll & UV_WRITABLE) && SSL_want(socket->ssl) != SSL_WRITING) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE)); + } + break; + } + } else if (sent <= 0) { + switch (SSL_get_error(socket->ssl, sent)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((socket->getPoll() & UV_WRITABLE) == 0) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE)); + } + break; + default: + STATE::onEnd((Socket *) p); + return; + } + break; + } + } + socket->cork(false); + } + + if (events & UV_READABLE) { + do { + int length = SSL_read(socket->ssl, socket->nodeData->recvBuffer, socket->nodeData->recvLength); + if (length <= 0) { + switch (SSL_get_error(socket->ssl, length)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((socket->getPoll() & UV_WRITABLE) == 0) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE)); + } + break; + default: + STATE::onEnd((Socket *) p); + return; + } + break; + } else { + // Warning: onData can delete the socket! Happens when HttpSocket upgrades + socket = STATE::onData((Socket *) p, socket->nodeData->recvBuffer, length); + if (socket->isClosed() || socket->isShuttingDown()) { + return; + } + } + } while (SSL_pending(socket->ssl)); + } + } + + template + static void ioHandler(Poll *p, int status, int events) { + Socket *socket = (Socket *) p; + NodeData *nodeData = socket->nodeData; + Context *netContext = nodeData->netContext; + + if (status < 0) { + STATE::onEnd((Socket *) p); + return; + } + + if (events & UV_WRITABLE) { + if (!socket->messageQueue.empty() && (events & UV_WRITABLE)) { + socket->cork(true); + while (true) { + Queue::Message *messagePtr = socket->messageQueue.front(); + ssize_t sent = ::send(socket->getFd(), messagePtr->data, messagePtr->length, MSG_NOSIGNAL); + if (sent == (ssize_t) messagePtr->length) { + if (messagePtr->callback) { + messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved); + } + socket->messageQueue.pop(); + if (socket->messageQueue.empty()) { + // todo, remove bit, don't set directly + socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE)); + break; + } + } else if (sent == SOCKET_ERROR) { + if (!netContext->wouldBlock()) { + STATE::onEnd((Socket *) p); + return; + } + break; + } else { + messagePtr->length -= sent; + messagePtr->data += sent; + break; + } + } + socket->cork(false); + } + } + + if (events & UV_READABLE) { + int length = recv(socket->getFd(), nodeData->recvBuffer, nodeData->recvLength, 0); + if (length > 0) { + STATE::onData((Socket *) p, nodeData->recvBuffer, length); + } else if (length <= 0 || (length == SOCKET_ERROR && !netContext->wouldBlock())) { + STATE::onEnd((Socket *) p); + } + } + + } + + template + void setState() { + if (ssl) { + setCb(sslIoHandler); + } else { + setCb(ioHandler); + } + } + + bool hasEmptyQueue() { + return messageQueue.empty(); + } + + void enqueue(Queue::Message *message) { + messageQueue.push(message); + } + + Queue::Message *allocMessage(size_t length, const char *data = 0) { + Queue::Message *messagePtr = (Queue::Message *) new char[sizeof(Queue::Message) + length]; + messagePtr->length = length; + messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message); + messagePtr->nextMessage = nullptr; + + if (data) { + memcpy((char *) messagePtr->data, data, messagePtr->length); + } + + return messagePtr; + } + + void freeMessage(Queue::Message *message) { + delete [] (char *) message; + } + + bool write(Queue::Message *message, bool &wasTransferred) { + ssize_t sent = 0; + if (messageQueue.empty()) { + + if (ssl) { + sent = SSL_write(ssl, message->data, message->length); + if (sent == (ssize_t) message->length) { + wasTransferred = false; + return true; + } else if (sent < 0) { + switch (SSL_get_error(ssl, sent)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((getPoll() & UV_WRITABLE) == 0) { + setPoll(getPoll() | UV_WRITABLE); + changePoll(this); + } + break; + default: + return false; + } + } + } else { + sent = ::send(getFd(), message->data, message->length, MSG_NOSIGNAL); + if (sent == (ssize_t) message->length) { + wasTransferred = false; + return true; + } else if (sent == SOCKET_ERROR) { + if (!nodeData->netContext->wouldBlock()) { + return false; + } + } else { + message->length -= sent; + message->data += sent; + } + + if ((getPoll() & UV_WRITABLE) == 0) { + setPoll(getPoll() | UV_WRITABLE); + changePoll(this); + } + } + } + messageQueue.push(message); + wasTransferred = true; + return true; + } + + template + void sendTransformed(const char *message, size_t length, void(*callback)(void *socket, void *data, bool cancelled, void *reserved), void *callbackData, D transformData) { + size_t estimatedLength = T::estimate(message, length) + sizeof(Queue::Message); + + if (hasEmptyQueue()) { + if (estimatedLength <= uS::NodeData::preAllocMaxSize) { + int memoryLength = estimatedLength; + int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength); + + Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex); + messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, callbackData, true, nullptr); + } + } + } else { + Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message)); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, true, nullptr); + } + } + } + } else { + Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message)); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + enqueue(messagePtr); + } + } + +public: + Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), ssl(ssl), nodeData(nodeData) { + if (ssl) { + // OpenSSL treats SOCKETs as int + SSL_set_fd(ssl, (int) fd); + SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS); + } + } + + NodeData *getNodeData() { + return nodeData; + } + + Poll *next = nullptr, *prev = nullptr; + + void *getUserData() { + return user; + } + + void setUserData(void *user) { + this->user = user; + } + + struct Address { + unsigned int port; + const char *address; + const char *family; + }; + + Address getAddress(); + + void setNoDelay(int enable) { + setsockopt(getFd(), IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); + } + + void cork(int enable) { +#if defined(TCP_CORK) + // Linux & SmartOS have proper TCP_CORK + setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, &enable, sizeof(int)); +#elif defined(TCP_NOPUSH) + // Mac OS X & FreeBSD have TCP_NOPUSH + setsockopt(getFd(), IPPROTO_TCP, TCP_NOPUSH, &enable, sizeof(int)); + if (!enable) { + // Tested on OS X, FreeBSD situation is unclear + ::send(getFd(), "", 0, MSG_NOSIGNAL); + } +#endif + } + + void shutdown() { + if (ssl) { + //todo: poll in/out - have the io_cb recall shutdown if failed + SSL_shutdown(ssl); + } else { + ::shutdown(getFd(), SHUT_WR); + } + } + + template + void closeSocket() { + uv_os_sock_t fd = getFd(); + Context *netContext = nodeData->netContext; + stop(nodeData->loop); + netContext->closeSocket(fd); + + if (ssl) { + SSL_free(ssl); + } + + Poll::close(nodeData->loop, [](Poll *p) { + delete (T *) p; + }); + } + + bool isShuttingDown() { + return state.shuttingDown; + } + + friend class Node; + friend struct NodeData; +}; + +struct ListenSocket : Socket { + + ListenSocket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Socket(nodeData, loop, fd, ssl) { + + } + + Timer *timer = nullptr; + uS::TLS::Context sslContext; +}; + +} + +#endif // SOCKET_UWS_H diff --git a/node_modules/uws/src/WebSocket.cpp b/node_modules/uws/src/WebSocket.cpp new file mode 100644 index 0000000..89ac23a --- /dev/null +++ b/node_modules/uws/src/WebSocket.cpp @@ -0,0 +1,405 @@ +#include "WebSocket.h" +#include "Group.h" +#include "Hub.h" + +namespace uWS { + +/* + * Frames and sends a WebSocket message. + * + * Hints: Consider using any of the prepare function if any of their + * use cases match what you are trying to achieve (pub/sub, broadcast) + * + * Thread safe + * + */ +template +void WebSocket::send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved), void *callbackData) { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*nodeData->asyncMutex); + if (isClosed()) { + if (callback) { + callback(this, callbackData, true, nullptr); + } + return; + } +#endif + + const int HEADER_LENGTH = WebSocketProtocol>::LONG_MESSAGE_HEADER; + + struct TransformData { + OpCode opCode; + } transformData = {opCode}; + + struct WebSocketTransformer { + static size_t estimate(const char *data, size_t length) { + return length + HEADER_LENGTH; + } + + static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) { + return WebSocketProtocol>::formatMessage(dst, src, length, transformData.opCode, length, false); + } + }; + + sendTransformed((char *) message, length, (void(*)(void *, void *, bool, void *)) callback, callbackData, transformData); +} + +/* + * Prepares a single message for use with sendPrepared. + * + * Hints: Useful in cases where you need to send the same message to many + * recipients. Do not use when only sending one message. + * + * Thread safe + * + */ +template +typename WebSocket::PreparedMessage *WebSocket::prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved)) { + PreparedMessage *preparedMessage = new PreparedMessage; + preparedMessage->buffer = new char[length + 10]; + preparedMessage->length = WebSocketProtocol>::formatMessage(preparedMessage->buffer, data, length, opCode, length, compressed); + preparedMessage->references = 1; + preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback; + return preparedMessage; +} + +/* + * Prepares a batch of messages to send as one single TCP packet / syscall. + * + * Hints: Useful when doing pub/sub-like broadcasts where many recipients should receive many + * messages. Do not use if only sending one message. + * + * Thread safe + * + */ +template +typename WebSocket::PreparedMessage *WebSocket::prepareMessageBatch(std::vector &messages, std::vector &excludedMessages, OpCode opCode, bool compressed, void (*callback)(WebSocket *, void *, bool, void *)) +{ + // should be sent in! + size_t batchLength = 0; + for (size_t i = 0; i < messages.size(); i++) { + batchLength += messages[i].length(); + } + + PreparedMessage *preparedMessage = new PreparedMessage; + preparedMessage->buffer = new char[batchLength + 10 * messages.size()]; + + int offset = 0; + for (size_t i = 0; i < messages.size(); i++) { + offset += WebSocketProtocol>::formatMessage(preparedMessage->buffer + offset, messages[i].data(), messages[i].length(), opCode, messages[i].length(), compressed); + } + preparedMessage->length = offset; + preparedMessage->references = 1; + preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback; + return preparedMessage; +} + +/* + * Sends a prepared message. + * + * Hints: Used to improve broadcasting and similar use cases where the same + * message is sent to multiple recipients. Do not used if only sending one message + * in total. + * + * Warning: Modifies passed PreparedMessage and is thus not thread safe. Other + * data is also modified and it makes sense to not make this function thread-safe + * since it is a central part in broadcasting and other high-perf code paths. + * + */ +template +void WebSocket::sendPrepared(typename WebSocket::PreparedMessage *preparedMessage, void *callbackData) { + // todo: see if this can be made a transformer instead + preparedMessage->references++; + void (*callback)(void *webSocket, void *userData, bool cancelled, void *reserved) = [](void *webSocket, void *userData, bool cancelled, void *reserved) { + PreparedMessage *preparedMessage = (PreparedMessage *) userData; + bool lastReference = !--preparedMessage->references; + + if (preparedMessage->callback) { + preparedMessage->callback(webSocket, reserved, cancelled, (void *) lastReference); + } + + if (lastReference) { + delete [] preparedMessage->buffer; + delete preparedMessage; + } + }; + + // candidate for fixed size pool allocator + int memoryLength = sizeof(Queue::Message); + int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength); + + Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex); + messagePtr->data = preparedMessage->buffer; + messagePtr->length = preparedMessage->length; + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, preparedMessage, false, callbackData); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = preparedMessage; + messagePtr->reserved = callbackData; + } + } else { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, preparedMessage, true, callbackData); + } + } +} + +/* + * Decrements the reference count of passed PreparedMessage. On zero references + * the memory will be deleted. + * + * Hints: Used together with prepareMessage, prepareMessageBatch and similar calls. + * + * Warning: Will modify passed PrepareMessage and is thus not thread safe by itself. + * + */ +template +void WebSocket::finalizeMessage(typename WebSocket::PreparedMessage *preparedMessage) { + if (!--preparedMessage->references) { + delete [] preparedMessage->buffer; + delete preparedMessage; + } +} + +template +uS::Socket *WebSocket::onData(uS::Socket *s, char *data, size_t length) { + WebSocket *webSocket = static_cast *>(s); + + webSocket->hasOutstandingPong = false; + if (!webSocket->isShuttingDown()) { + webSocket->cork(true); + WebSocketProtocol>::consume(data, length, webSocket); + if (!webSocket->isClosed()) { + webSocket->cork(false); + } + } + + return webSocket; +} + +/* + * Immediately terminates this WebSocket. Will call onDisconnection of its Group. + * + * Hints: Close code will be 1006 and message will be empty. + * + */ +template +void WebSocket::terminate() { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*nodeData->asyncMutex); + if (isClosed()) { + return; + } +#endif + + WebSocket::onEnd(this); +} + +/* + * Transfers this WebSocket from its current Group to specified Group. + * + * Receiving Group has to have called listen(uWS::TRANSFERS) prior. + * + * Hints: Useful to implement subprotocols on the same thread and Loop + * or to transfer WebSockets between threads at any point (dynamic load balancing). + * + * Warning: From the point of call to the point of onTransfer, this WebSocket + * is invalid and cannot be used. What you put in is not guaranteed to be what you + * get in onTransfer, the only guaranteed consistency is passed userData is the userData + * of given WebSocket in onTransfer. Use setUserData and getUserData to identify the WebSocket. + */ +template +void WebSocket::transfer(Group *group) { + Group::from(this)->removeWebSocket(this); + if (group->loop == Group::from(this)->loop) { + // fast path + this->nodeData = group; + Group::from(this)->addWebSocket(this); + Group::from(this)->transferHandler(this); + } else { + // slow path + uS::Socket::transfer((uS::NodeData *) group, [](Poll *p) { + WebSocket *webSocket = (WebSocket *) p; + Group::from(webSocket)->addWebSocket(webSocket); + Group::from(webSocket)->transferHandler(webSocket); + }); + } +} + +/* + * Immediately calls onDisconnection of its Group and begins a passive + * WebSocket closedown handshake in the background (might succeed or not, + * we don't care). + * + * Hints: Close code and message will be what you pass yourself. + * + */ +template +void WebSocket::close(int code, const char *message, size_t length) { + + // startTimeout is not thread safe + + static const int MAX_CLOSE_PAYLOAD = 123; + length = std::min(MAX_CLOSE_PAYLOAD, length); + Group::from(this)->removeWebSocket(this); + Group::from(this)->disconnectionHandler(this, code, (char *) message, length); + setShuttingDown(true); + + // todo: using the shared timer in the group, we can skip creating a new timer per socket + // only this line and the one in Hub::connect uses the timeout feature + startTimeout::onEnd>(); + + char closePayload[MAX_CLOSE_PAYLOAD + 2]; + int closePayloadLength = WebSocketProtocol>::formatClosePayload(closePayload, code, message, length); + send(closePayload, closePayloadLength, OpCode::CLOSE, [](WebSocket *p, void *data, bool cancelled, void *reserved) { + if (!cancelled) { + p->shutdown(); + } + }); +} + +template +void WebSocket::onEnd(uS::Socket *s) { + WebSocket *webSocket = static_cast *>(s); + + if (!webSocket->isShuttingDown()) { + Group::from(webSocket)->removeWebSocket(webSocket); + Group::from(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0); + } else { + webSocket->cancelTimeout(); + } + + webSocket->template closeSocket>(); + + while (!webSocket->messageQueue.empty()) { + Queue::Message *message = webSocket->messageQueue.front(); + if (message->callback) { + message->callback(nullptr, message->callbackData, true, nullptr); + } + webSocket->messageQueue.pop(); + } + + webSocket->nodeData->clearPendingPollChanges(webSocket); +} + +template +bool WebSocket::handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + Group *group = Group::from(webSocket); + + if (opCode < 3) { + if (!remainingBytes && fin && !webSocket->fragmentBuffer.length()) { + if (webSocket->compressionStatus == WebSocket::CompressionStatus::COMPRESSED_FRAME) { + webSocket->compressionStatus = WebSocket::CompressionStatus::ENABLED; + data = group->hub->inflate(data, length, group->maxPayload); + if (!data) { + forceClose(webSocketState); + return true; + } + } + + if (opCode == 1 && !WebSocketProtocol>::isValidUtf8((unsigned char *) data, length)) { + forceClose(webSocketState); + return true; + } + + group->messageHandler(webSocket, data, length, (OpCode) opCode); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else { + webSocket->fragmentBuffer.append(data, length); + if (!remainingBytes && fin) { + length = webSocket->fragmentBuffer.length(); + if (webSocket->compressionStatus == WebSocket::CompressionStatus::COMPRESSED_FRAME) { + webSocket->compressionStatus = WebSocket::CompressionStatus::ENABLED; + webSocket->fragmentBuffer.append("...."); + data = group->hub->inflate((char *) webSocket->fragmentBuffer.data(), length, group->maxPayload); + if (!data) { + forceClose(webSocketState); + return true; + } + } else { + data = (char *) webSocket->fragmentBuffer.data(); + } + + if (opCode == 1 && !WebSocketProtocol>::isValidUtf8((unsigned char *) data, length)) { + forceClose(webSocketState); + return true; + } + + group->messageHandler(webSocket, data, length, (OpCode) opCode); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + webSocket->fragmentBuffer.clear(); + } + } + } else { + if (!remainingBytes && fin && !webSocket->controlTipLength) { + if (opCode == CLOSE) { + typename WebSocketProtocol>::CloseFrame closeFrame = WebSocketProtocol>::parseClosePayload(data, length); + webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length); + return true; + } else { + if (opCode == PING) { + webSocket->send(data, length, (OpCode) OpCode::PONG); + group->pingHandler(webSocket, data, length); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else if (opCode == PONG) { + group->pongHandler(webSocket, data, length); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } + } + } else { + webSocket->fragmentBuffer.append(data, length); + webSocket->controlTipLength += length; + + if (!remainingBytes && fin) { + char *controlBuffer = (char *) webSocket->fragmentBuffer.data() + webSocket->fragmentBuffer.length() - webSocket->controlTipLength; + if (opCode == CLOSE) { + typename WebSocketProtocol>::CloseFrame closeFrame = WebSocketProtocol>::parseClosePayload(controlBuffer, webSocket->controlTipLength); + webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length); + return true; + } else { + if (opCode == PING) { + webSocket->send(controlBuffer, webSocket->controlTipLength, (OpCode) OpCode::PONG); + group->pingHandler(webSocket, controlBuffer, webSocket->controlTipLength); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else if (opCode == PONG) { + group->pongHandler(webSocket, controlBuffer, webSocket->controlTipLength); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } + } + + webSocket->fragmentBuffer.resize(webSocket->fragmentBuffer.length() - webSocket->controlTipLength); + webSocket->controlTipLength = 0; + } + } + } + + return false; +} + +template struct WebSocket; +template struct WebSocket; + +} diff --git a/node_modules/uws/src/WebSocket.h b/node_modules/uws/src/WebSocket.h new file mode 100644 index 0000000..9e7f547 --- /dev/null +++ b/node_modules/uws/src/WebSocket.h @@ -0,0 +1,89 @@ +#ifndef WEBSOCKET_UWS_H +#define WEBSOCKET_UWS_H + +#include "WebSocketProtocol.h" +#include "Socket.h" + +namespace uWS { + +template +struct Group; + +template +struct HttpSocket; + +template +struct WIN32_EXPORT WebSocket : uS::Socket, WebSocketState { +protected: + std::string fragmentBuffer; + enum CompressionStatus : char { + DISABLED, + ENABLED, + COMPRESSED_FRAME + } compressionStatus; + unsigned char controlTipLength = 0, hasOutstandingPong = false; + + WebSocket(bool perMessageDeflate, uS::Socket *socket) : uS::Socket(std::move(*socket)) { + compressionStatus = perMessageDeflate ? CompressionStatus::ENABLED : CompressionStatus::DISABLED; + } + + static uS::Socket *onData(uS::Socket *s, char *data, size_t length); + static void onEnd(uS::Socket *s); + using uS::Socket::closeSocket; + + static bool refusePayloadLength(uint64_t length, WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + return length > Group::from(webSocket)->maxPayload; + } + + static bool setCompressed(WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + + if (webSocket->compressionStatus == WebSocket::CompressionStatus::ENABLED) { + webSocket->compressionStatus = WebSocket::CompressionStatus::COMPRESSED_FRAME; + return true; + } else { + return false; + } + } + + static void forceClose(WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + webSocket->terminate(); + } + + static bool handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState *webSocketState); + +public: + struct PreparedMessage { + char *buffer; + size_t length; + int references; + void(*callback)(void *webSocket, void *data, bool cancelled, void *reserved); + }; + + // Not thread safe + void sendPrepared(PreparedMessage *preparedMessage, void *callbackData = nullptr); + static void finalizeMessage(PreparedMessage *preparedMessage); + void close(int code = 1000, const char *message = nullptr, size_t length = 0); + void transfer(Group *group); + + // Thread safe + void terminate(); + void ping(const char *message) {send(message, OpCode::PING);} + void send(const char *message, OpCode opCode = OpCode::TEXT) {send(message, strlen(message), opCode);} + void send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr, void *callbackData = nullptr); + static PreparedMessage *prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr); + static PreparedMessage *prepareMessageBatch(std::vector &messages, std::vector &excludedMessages, + OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr); + + friend struct Hub; + friend struct Group; + friend struct HttpSocket; + friend struct uS::Socket; + friend class WebSocketProtocol>; +}; + +} + +#endif // WEBSOCKET_UWS_H diff --git a/node_modules/uws/src/WebSocketProtocol.h b/node_modules/uws/src/WebSocketProtocol.h new file mode 100644 index 0000000..b7b4695 --- /dev/null +++ b/node_modules/uws/src/WebSocketProtocol.h @@ -0,0 +1,377 @@ +#ifndef WEBSOCKETPROTOCOL_UWS_H +#define WEBSOCKETPROTOCOL_UWS_H + +// we do need to include this for htobe64, should be moved from networking! +#include "Networking.h" + +#include +#include + +namespace uWS { + +enum OpCode : unsigned char { + TEXT = 1, + BINARY = 2, + CLOSE = 8, + PING = 9, + PONG = 10 +}; + +enum { + CLIENT, + SERVER +}; + +// 24 bytes perfectly +template +struct WebSocketState { +public: + static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2; + static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4; + static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10; + + // 16 bytes + struct State { + unsigned int wantsHead : 1; + unsigned int spillLength : 4; + int opStack : 2; // -1, 0, 1 + unsigned int lastFin : 1; + + // 15 bytes + unsigned char spill[LONG_MESSAGE_HEADER - 1]; + OpCode opCode[2]; + + State() { + wantsHead = true; + spillLength = 0; + opStack = -1; + lastFin = true; + } + + } state; + + // 8 bytes + unsigned int remainingBytes = 0; + char mask[isServer ? 4 : 1]; +}; + +template +class WIN32_EXPORT WebSocketProtocol { +public: + static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2; + static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4; + static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10; + +private: + static inline bool isFin(char *frame) {return *((unsigned char *) frame) & 128;} + static inline unsigned char getOpCode(char *frame) {return *((unsigned char *) frame) & 15;} + static inline unsigned char payloadLength(char *frame) {return ((unsigned char *) frame)[1] & 127;} + static inline bool rsv23(char *frame) {return *((unsigned char *) frame) & 48;} + static inline bool rsv1(char *frame) {return *((unsigned char *) frame) & 64;} + + static inline void unmaskImprecise(char *dst, char *src, char *mask, unsigned int length) { + for (unsigned int n = (length >> 2) + 1; n; n--) { + *(dst++) = *(src++) ^ mask[0]; + *(dst++) = *(src++) ^ mask[1]; + *(dst++) = *(src++) ^ mask[2]; + *(dst++) = *(src++) ^ mask[3]; + } + } + + static inline void unmaskImpreciseCopyMask(char *dst, char *src, char *maskPtr, unsigned int length) { + char mask[4] = {maskPtr[0], maskPtr[1], maskPtr[2], maskPtr[3]}; + unmaskImprecise(dst, src, mask, length); + } + + static inline void rotateMask(unsigned int offset, char *mask) { + char originalMask[4] = {mask[0], mask[1], mask[2], mask[3]}; + mask[(0 + offset) % 4] = originalMask[0]; + mask[(1 + offset) % 4] = originalMask[1]; + mask[(2 + offset) % 4] = originalMask[2]; + mask[(3 + offset) % 4] = originalMask[3]; + } + + static inline void unmaskInplace(char *data, char *stop, char *mask) { + while (data < stop) { + *(data++) ^= mask[0]; + *(data++) ^= mask[1]; + *(data++) ^= mask[2]; + *(data++) ^= mask[3]; + } + } + + enum { + SND_CONTINUATION = 1, + SND_NO_FIN = 2, + SND_COMPRESSED = 64 + }; + + template + static inline bool consumeMessage(T payLength, char *&src, unsigned int &length, WebSocketState *wState) { + if (getOpCode(src)) { + if (wState->state.opStack == 1 || (!wState->state.lastFin && getOpCode(src) < 2)) { + Impl::forceClose(wState); + return true; + } + wState->state.opCode[++wState->state.opStack] = (OpCode) getOpCode(src); + } else if (wState->state.opStack == -1) { + Impl::forceClose(wState); + return true; + } + wState->state.lastFin = isFin(src); + + if (Impl::refusePayloadLength(payLength, wState)) { + Impl::forceClose(wState); + return true; + } + + if (payLength + MESSAGE_HEADER <= length) { + if (isServer) { + unmaskImpreciseCopyMask(src + MESSAGE_HEADER - 4, src + MESSAGE_HEADER, src + MESSAGE_HEADER - 4, payLength); + if (Impl::handleFragment(src + MESSAGE_HEADER - 4, payLength, 0, wState->state.opCode[wState->state.opStack], isFin(src), wState)) { + return true; + } + } else { + if (Impl::handleFragment(src + MESSAGE_HEADER, payLength, 0, wState->state.opCode[wState->state.opStack], isFin(src), wState)) { + return true; + } + } + + if (isFin(src)) { + wState->state.opStack--; + } + + src += payLength + MESSAGE_HEADER; + length -= payLength + MESSAGE_HEADER; + wState->state.spillLength = 0; + return false; + } else { + wState->state.spillLength = 0; + wState->state.wantsHead = false; + wState->remainingBytes = payLength - length + MESSAGE_HEADER; + bool fin = isFin(src); + if (isServer) { + memcpy(wState->mask, src + MESSAGE_HEADER - 4, 4); + unmaskImprecise(src, src + MESSAGE_HEADER, wState->mask, length - MESSAGE_HEADER); + rotateMask(4 - (length - MESSAGE_HEADER) % 4, wState->mask); + } else { + src += MESSAGE_HEADER; + } + Impl::handleFragment(src, length - MESSAGE_HEADER, wState->remainingBytes, wState->state.opCode[wState->state.opStack], fin, wState); + return true; + } + } + + static inline bool consumeContinuation(char *&src, unsigned int &length, WebSocketState *wState) { + if (wState->remainingBytes <= length) { + if (isServer) { + int n = wState->remainingBytes >> 2; + unmaskInplace(src, src + n * 4, wState->mask); + for (int i = 0, s = wState->remainingBytes % 4; i < s; i++) { + src[n * 4 + i] ^= wState->mask[i]; + } + } + + if (Impl::handleFragment(src, wState->remainingBytes, 0, wState->state.opCode[wState->state.opStack], wState->state.lastFin, wState)) { + return false; + } + + if (wState->state.lastFin) { + wState->state.opStack--; + } + + src += wState->remainingBytes; + length -= wState->remainingBytes; + wState->state.wantsHead = true; + return true; + } else { + if (isServer) { + unmaskInplace(src, src + ((length >> 2) + 1) * 4, wState->mask); + } + + wState->remainingBytes -= length; + if (Impl::handleFragment(src, length, wState->remainingBytes, wState->state.opCode[wState->state.opStack], wState->state.lastFin, wState)) { + return false; + } + + if (isServer && length % 4) { + rotateMask(4 - (length % 4), wState->mask); + } + return false; + } + } + +public: + WebSocketProtocol() { + + } + + // Based on utf8_check.c by Markus Kuhn, 2005 + // https://www.cl.cam.ac.uk/~mgk25/ucs/utf8_check.c + // Optimized for predominantly 7-bit content by Alex Hultman, 2016 + // Licensed as Zlib, like the rest of this project + static bool isValidUtf8(unsigned char *s, size_t length) + { + for (unsigned char *e = s + length; s != e; ) { + if (s + 4 <= e && ((*(uint32_t *) s) & 0x80808080) == 0) { + s += 4; + } else { + while (!(*s & 0x80)) { + if (++s == e) { + return true; + } + } + + if ((s[0] & 0x60) == 0x40) { + if (s + 1 >= e || (s[1] & 0xc0) != 0x80 || (s[0] & 0xfe) == 0xc0) { + return false; + } + s += 2; + } else if ((s[0] & 0xf0) == 0xe0) { + if (s + 2 >= e || (s[1] & 0xc0) != 0x80 || (s[2] & 0xc0) != 0x80 || + (s[0] == 0xe0 && (s[1] & 0xe0) == 0x80) || (s[0] == 0xed && (s[1] & 0xe0) == 0xa0)) { + return false; + } + s += 3; + } else if ((s[0] & 0xf8) == 0xf0) { + if (s + 3 >= e || (s[1] & 0xc0) != 0x80 || (s[2] & 0xc0) != 0x80 || (s[3] & 0xc0) != 0x80 || + (s[0] == 0xf0 && (s[1] & 0xf0) == 0x80) || (s[0] == 0xf4 && s[1] > 0x8f) || s[0] > 0xf4) { + return false; + } + s += 4; + } else { + return false; + } + } + } + return true; + } + + struct CloseFrame { + uint16_t code; + char *message; + size_t length; + }; + + static inline CloseFrame parseClosePayload(char *src, size_t length) { + CloseFrame cf = {}; + if (length >= 2) { + memcpy(&cf.code, src, 2); + cf = {ntohs(cf.code), src + 2, length - 2}; + if (cf.code < 1000 || cf.code > 4999 || (cf.code > 1011 && cf.code < 4000) || + (cf.code >= 1004 && cf.code <= 1006) || !isValidUtf8((unsigned char *) cf.message, cf.length)) { + return {}; + } + } + return cf; + } + + static inline size_t formatClosePayload(char *dst, uint16_t code, const char *message, size_t length) { + if (code) { + code = htons(code); + memcpy(dst, &code, 2); + memcpy(dst + 2, message, length); + return length + 2; + } + return 0; + } + + static inline size_t formatMessage(char *dst, const char *src, size_t length, OpCode opCode, size_t reportedLength, bool compressed) { + size_t messageLength; + size_t headerLength; + if (reportedLength < 126) { + headerLength = 2; + dst[1] = reportedLength; + } else if (reportedLength <= UINT16_MAX) { + headerLength = 4; + dst[1] = 126; + *((uint16_t *) &dst[2]) = htons(reportedLength); + } else { + headerLength = 10; + dst[1] = 127; + *((uint64_t *) &dst[2]) = htobe64(reportedLength); + } + + int flags = 0; + dst[0] = (flags & SND_NO_FIN ? 0 : 128) | (compressed ? SND_COMPRESSED : 0); + if (!(flags & SND_CONTINUATION)) { + dst[0] |= opCode; + } + + char mask[4]; + if (!isServer) { + dst[1] |= 0x80; + uint32_t random = rand(); + memcpy(mask, &random, 4); + memcpy(dst + headerLength, &random, 4); + headerLength += 4; + } + + messageLength = headerLength + length; + memcpy(dst + headerLength, src, length); + + if (!isServer) { + + // overwrites up to 3 bytes outside of the given buffer! + //WebSocketProtocol::unmaskInplace(dst + headerLength, dst + headerLength + length, mask); + + // this is not optimal + char *start = dst + headerLength; + char *stop = start + length; + int i = 0; + while (start != stop) { + (*start++) ^= mask[i++ % 4]; + } + } + return messageLength; + } + + static inline void consume(char *src, unsigned int length, WebSocketState *wState) { + if (wState->state.spillLength) { + src -= wState->state.spillLength; + length += wState->state.spillLength; + memcpy(src, wState->state.spill, wState->state.spillLength); + } + if (wState->state.wantsHead) { + parseNext: + while (length >= SHORT_MESSAGE_HEADER) { + + // invalid reserved bits / invalid opcodes / invalid control frames / set compressed frame + if ((rsv1(src) && !Impl::setCompressed(wState)) || rsv23(src) || (getOpCode(src) > 2 && getOpCode(src) < 8) || + getOpCode(src) > 10 || (getOpCode(src) > 2 && (!isFin(src) || payloadLength(src) > 125))) { + Impl::forceClose(wState); + return; + } + + if (payloadLength(src) < 126) { + if (consumeMessage(payloadLength(src), src, length, wState)) { + return; + } + } else if (payloadLength(src) == 126) { + if (length < MEDIUM_MESSAGE_HEADER) { + break; + } else if(consumeMessage(ntohs(*(uint16_t *) &src[2]), src, length, wState)) { + return; + } + } else if (length < LONG_MESSAGE_HEADER) { + break; + } else if (consumeMessage(be64toh(*(uint64_t *) &src[2]), src, length, wState)) { + return; + } + } + if (length) { + memcpy(wState->state.spill, src, length); + wState->state.spillLength = length; + } + } else if (consumeContinuation(src, length, wState)) { + goto parseNext; + } + } + + static const int CONSUME_POST_PADDING = 4; + static const int CONSUME_PRE_PADDING = LONG_MESSAGE_HEADER - 1; +}; + +} + +#endif // WEBSOCKETPROTOCOL_UWS_H diff --git a/node_modules/uws/src/addon.cpp b/node_modules/uws/src/addon.cpp new file mode 100644 index 0000000..15e6905 --- /dev/null +++ b/node_modules/uws/src/addon.cpp @@ -0,0 +1,24 @@ +#include "../src/uWS.h" +#include "addon.h" +#include "http.h" + +void Main(Local exports) { + Isolate *isolate = exports->GetIsolate(); + + exports->Set(String::NewFromUtf8(isolate, "server"), Namespace(isolate).object); + exports->Set(String::NewFromUtf8(isolate, "client"), Namespace(isolate).object); + exports->Set(String::NewFromUtf8(isolate, "httpServer"), HttpServer::getHttpServer(isolate)); + + NODE_SET_METHOD(exports, "setUserData", setUserData); + NODE_SET_METHOD(exports, "getUserData", getUserData); + NODE_SET_METHOD(exports, "clearUserData", clearUserData); + NODE_SET_METHOD(exports, "getAddress", getAddress); + + NODE_SET_METHOD(exports, "transfer", transfer); + NODE_SET_METHOD(exports, "upgrade", upgrade); + NODE_SET_METHOD(exports, "connect", connect); + NODE_SET_METHOD(exports, "setNoop", setNoop); + registerCheck(isolate); +} + +NODE_MODULE(uws, Main) diff --git a/node_modules/uws/src/addon.h b/node_modules/uws/src/addon.h new file mode 100644 index 0000000..93a41e5 --- /dev/null +++ b/node_modules/uws/src/addon.h @@ -0,0 +1,464 @@ +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace v8; + +uWS::Hub hub(0, true); +uv_check_t check; +Persistent noop; + +void registerCheck(Isolate *isolate) { + uv_check_init((uv_loop_t *) hub.getLoop(), &check); + check.data = isolate; + uv_check_start(&check, [](uv_check_t *check) { + Isolate *isolate = (Isolate *) check->data; + HandleScope hs(isolate); + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, noop), 0, nullptr); + }); + uv_unref((uv_handle_t *) &check); +} + +class NativeString { + char *data; + size_t length; + char utf8ValueMemory[sizeof(String::Utf8Value)]; + String::Utf8Value *utf8Value = nullptr; +public: + NativeString(const Local &value) { + if (value->IsUndefined()) { + data = nullptr; + length = 0; + } else if (value->IsString()) { + utf8Value = new (utf8ValueMemory) String::Utf8Value(value); + data = (**utf8Value); + length = utf8Value->length(); + } else if (node::Buffer::HasInstance(value)) { + data = node::Buffer::Data(value); + length = node::Buffer::Length(value); + } else if (value->IsTypedArray()) { + Local arrayBufferView = Local::Cast(value); + ArrayBuffer::Contents contents = arrayBufferView->Buffer()->GetContents(); + length = contents.ByteLength(); + data = (char *) contents.Data(); + } else if (value->IsArrayBuffer()) { + Local arrayBuffer = Local::Cast(value); + ArrayBuffer::Contents contents = arrayBuffer->GetContents(); + length = contents.ByteLength(); + data = (char *) contents.Data(); + } else { + static char empty[] = ""; + data = empty; + length = 0; + } + } + + char *getData() {return data;} + size_t getLength() {return length;} + ~NativeString() { + if (utf8Value) { + utf8Value->~Utf8Value(); + } + } +}; + +struct GroupData { + Persistent connectionHandler, messageHandler, + disconnectionHandler, pingHandler, + pongHandler, errorHandler, httpRequestHandler, + httpUpgradeHandler, httpCancelledRequestCallback; + int size = 0; +}; + +template +void createGroup(const FunctionCallbackInfo &args) { + uWS::Group *group = hub.createGroup(args[0]->IntegerValue(), args[1]->IntegerValue()); + group->setUserData(new GroupData); + args.GetReturnValue().Set(External::New(args.GetIsolate(), group)); +} + +template +void deleteGroup(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + delete (GroupData *) group->getUserData(); + delete group; +} + +template +inline Local wrapSocket(uWS::WebSocket *webSocket, Isolate *isolate) { + return External::New(isolate, webSocket); +} + +template +inline uWS::WebSocket *unwrapSocket(Local external) { + return (uWS::WebSocket *) external->Value(); +} + +inline Local wrapMessage(const char *message, size_t length, uWS::OpCode opCode, Isolate *isolate) { + return opCode == uWS::OpCode::BINARY ? (Local) ArrayBuffer::New(isolate, (char *) message, length) : (Local) String::NewFromUtf8(isolate, message, String::kNormalString, length); +} + +template +inline Local getDataV8(uWS::WebSocket *webSocket, Isolate *isolate) { + return webSocket->getUserData() ? Local::New(isolate, *(Persistent *) webSocket->getUserData()) : Local::Cast(Undefined(isolate)); +} + +template +void getUserData(const FunctionCallbackInfo &args) { + args.GetReturnValue().Set(getDataV8(unwrapSocket(args[0].As()), args.GetIsolate())); +} + +template +void clearUserData(const FunctionCallbackInfo &args) { + uWS::WebSocket *webSocket = unwrapSocket(args[0].As()); + ((Persistent *) webSocket->getUserData())->Reset(); + delete (Persistent *) webSocket->getUserData(); +} + +template +void setUserData(const FunctionCallbackInfo &args) { + uWS::WebSocket *webSocket = unwrapSocket(args[0].As()); + if (webSocket->getUserData()) { + ((Persistent *) webSocket->getUserData())->Reset(args.GetIsolate(), args[1]); + } else { + webSocket->setUserData(new Persistent(args.GetIsolate(), args[1])); + } +} + +template +void getAddress(const FunctionCallbackInfo &args) +{ + typename uWS::WebSocket::Address address = unwrapSocket(args[0].As())->getAddress(); + Local array = Array::New(args.GetIsolate(), 3); + array->Set(0, Integer::New(args.GetIsolate(), address.port)); + array->Set(1, String::NewFromUtf8(args.GetIsolate(), address.address)); + array->Set(2, String::NewFromUtf8(args.GetIsolate(), address.family)); + args.GetReturnValue().Set(array); +} + +uv_handle_t *getTcpHandle(void *handleWrap) { + volatile char *memory = (volatile char *) handleWrap; + for (volatile uv_handle_t *tcpHandle = (volatile uv_handle_t *) memory; tcpHandle->type != UV_TCP + || tcpHandle->data != handleWrap || tcpHandle->loop != uv_default_loop(); tcpHandle = (volatile uv_handle_t *) memory) { + memory++; + } + return (uv_handle_t *) memory; +} + +struct SendCallbackData { + Persistent jsCallback; + Isolate *isolate; +}; + +template +void sendCallback(uWS::WebSocket *webSocket, void *data, bool cancelled, void *reserved) +{ + SendCallbackData *sc = (SendCallbackData *) data; + if (!cancelled) { + HandleScope hs(sc->isolate); + node::MakeCallback(sc->isolate, sc->isolate->GetCurrentContext()->Global(), Local::New(sc->isolate, sc->jsCallback), 0, nullptr); + } + sc->jsCallback.Reset(); + delete sc; +} + +template +void send(const FunctionCallbackInfo &args) +{ + uWS::OpCode opCode = (uWS::OpCode) args[2]->IntegerValue(); + NativeString nativeString(args[1]); + + SendCallbackData *sc = nullptr; + void (*callback)(uWS::WebSocket *, void *, bool, void *) = nullptr; + + if (args[3]->IsFunction()) { + callback = sendCallback; + sc = new SendCallbackData; + sc->jsCallback.Reset(args.GetIsolate(), Local::Cast(args[3])); + sc->isolate = args.GetIsolate(); + } + + unwrapSocket(args[0].As())->send(nativeString.getData(), + nativeString.getLength(), opCode, callback, sc); +} + +void connect(const FunctionCallbackInfo &args) { + uWS::Group *clientGroup = (uWS::Group *) args[0].As()->Value(); + NativeString uri(args[1]); + hub.connect(std::string(uri.getData(), uri.getLength()), new Persistent(args.GetIsolate(), args[2]), {}, 5000, clientGroup); +} + +struct Ticket { + uv_os_sock_t fd; + SSL *ssl; +}; + +void upgrade(const FunctionCallbackInfo &args) { + uWS::Group *serverGroup = (uWS::Group *) args[0].As()->Value(); + Ticket *ticket = (Ticket *) args[1].As()->Value(); + NativeString secKey(args[2]); + NativeString extensions(args[3]); + NativeString subprotocol(args[4]); + + // todo: move this check into core! + if (ticket->fd != INVALID_SOCKET) { + hub.upgrade(ticket->fd, secKey.getData(), ticket->ssl, extensions.getData(), extensions.getLength(), subprotocol.getData(), subprotocol.getLength(), serverGroup); + } else { + if (ticket->ssl) { + SSL_free(ticket->ssl); + } + } + delete ticket; +} + +void transfer(const FunctionCallbackInfo &args) { + // (_handle.fd OR _handle), SSL + uv_handle_t *handle = nullptr; + Ticket *ticket = new Ticket; + if (args[0]->IsObject()) { + uv_fileno((handle = getTcpHandle(args[0]->ToObject()->GetAlignedPointerFromInternalField(0))), (uv_os_fd_t *) &ticket->fd); + } else { + ticket->fd = args[0]->IntegerValue(); + } + + ticket->fd = dup(ticket->fd); + ticket->ssl = nullptr; + if (args[1]->IsExternal()) { + ticket->ssl = (SSL *) args[1].As()->Value(); + SSL_up_ref(ticket->ssl); + } + + // uv_close calls shutdown if not set on Windows + if (handle) { + // UV_HANDLE_SHARED_TCP_SOCKET + handle->flags |= 0x40000000; + } + + args.GetReturnValue().Set(External::New(args.GetIsolate(), ticket)); +} + +template +void onConnection(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *connectionCallback = &groupData->connectionHandler; + connectionCallback->Reset(isolate, Local::Cast(args[1])); + group->onConnection([isolate, connectionCallback, groupData](uWS::WebSocket *webSocket, uWS::HttpRequest req) { + groupData->size++; + HandleScope hs(isolate); + Local argv[] = {wrapSocket(webSocket, isolate)}; + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, *connectionCallback), 1, argv); + }); +} + +template +void onMessage(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *messageCallback = &groupData->messageHandler; + messageCallback->Reset(isolate, Local::Cast(args[1])); + group->onMessage([isolate, messageCallback](uWS::WebSocket *webSocket, const char *message, size_t length, uWS::OpCode opCode) { + HandleScope hs(isolate); + Local argv[] = {wrapMessage(message, length, opCode, isolate), + getDataV8(webSocket, isolate)}; + Local::New(isolate, *messageCallback)->Call(isolate->GetCurrentContext()->Global(), 2, argv); + }); +} + +template +void onPing(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *pingCallback = &groupData->pingHandler; + pingCallback->Reset(isolate, Local::Cast(args[1])); + group->onPing([isolate, pingCallback](uWS::WebSocket *webSocket, const char *message, size_t length) { + HandleScope hs(isolate); + Local argv[] = {wrapMessage(message, length, uWS::OpCode::PING, isolate), + getDataV8(webSocket, isolate)}; + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, *pingCallback), 2, argv); + }); +} + +template +void onPong(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *pongCallback = &groupData->pongHandler; + pongCallback->Reset(isolate, Local::Cast(args[1])); + group->onPong([isolate, pongCallback](uWS::WebSocket *webSocket, const char *message, size_t length) { + HandleScope hs(isolate); + Local argv[] = {wrapMessage(message, length, uWS::OpCode::PONG, isolate), + getDataV8(webSocket, isolate)}; + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, *pongCallback), 2, argv); + }); +} + +template +void onDisconnection(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *disconnectionCallback = &groupData->disconnectionHandler; + disconnectionCallback->Reset(isolate, Local::Cast(args[1])); + + group->onDisconnection([isolate, disconnectionCallback, groupData](uWS::WebSocket *webSocket, int code, char *message, size_t length) { + groupData->size--; + HandleScope hs(isolate); + Local argv[] = {wrapSocket(webSocket, isolate), + Integer::New(isolate, code), + wrapMessage(message, length, uWS::OpCode::CLOSE, isolate), + getDataV8(webSocket, isolate)}; + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, *disconnectionCallback), 4, argv); + }); +} + +void onError(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *errorCallback = &groupData->errorHandler; + errorCallback->Reset(isolate, Local::Cast(args[1])); + + group->onError([isolate, errorCallback](void *user) { + HandleScope hs(isolate); + Local argv[] = {Local::New(isolate, *(Persistent *) user)}; + node::MakeCallback(isolate, isolate->GetCurrentContext()->Global(), Local::New(isolate, *errorCallback), 1, argv); + + ((Persistent *) user)->Reset(); + delete (Persistent *) user; + }); +} + +template +void closeSocket(const FunctionCallbackInfo &args) { + NativeString nativeString(args[2]); + unwrapSocket(args[0].As())->close(args[1]->IntegerValue(), nativeString.getData(), nativeString.getLength()); +} + +template +void terminateSocket(const FunctionCallbackInfo &args) { + unwrapSocket(args[0].As())->terminate(); +} + +template +void closeGroup(const FunctionCallbackInfo &args) { + NativeString nativeString(args[2]); + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + group->close(args[1]->IntegerValue(), nativeString.getData(), nativeString.getLength()); +} + +template +void terminateGroup(const FunctionCallbackInfo &args) { + ((uWS::Group *) args[0].As()->Value())->terminate(); +} + +template +void broadcast(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + uWS::OpCode opCode = args[2]->BooleanValue() ? uWS::OpCode::BINARY : uWS::OpCode::TEXT; + NativeString nativeString(args[1]); + group->broadcast(nativeString.getData(), nativeString.getLength(), opCode); +} + +template +void prepareMessage(const FunctionCallbackInfo &args) { + uWS::OpCode opCode = (uWS::OpCode) args[1]->IntegerValue(); + NativeString nativeString(args[0]); + args.GetReturnValue().Set(External::New(args.GetIsolate(), uWS::WebSocket::prepareMessage(nativeString.getData(), nativeString.getLength(), opCode, false))); +} + +template +void sendPrepared(const FunctionCallbackInfo &args) { + unwrapSocket(args[0].As()) + ->sendPrepared((typename uWS::WebSocket::PreparedMessage *) args[1].As()->Value()); +} + +template +void finalizeMessage(const FunctionCallbackInfo &args) { + uWS::WebSocket::finalizeMessage((typename uWS::WebSocket::PreparedMessage *) args[0].As()->Value()); +} + +void forEach(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + Local cb = Local::Cast(args[1]); + group->forEach([isolate, &cb](uWS::WebSocket *webSocket) { + Local argv[] = { + getDataV8(webSocket, isolate) + }; + cb->Call(Null(isolate), 1, argv); + }); +} + +void getSize(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + GroupData *groupData = (GroupData *) group->getUserData(); + args.GetReturnValue().Set(Integer::New(args.GetIsolate(), groupData->size)); +} + +void startAutoPing(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + NativeString nativeString(args[2]); + group->startAutoPing(args[1]->IntegerValue(), std::string(nativeString.getData(), nativeString.getLength())); +} + +void setNoop(const FunctionCallbackInfo &args) { + noop.Reset(args.GetIsolate(), Local::Cast(args[0])); +} + +void listen(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args[0].As()->Value(); + hub.listen(args[1]->IntegerValue(), nullptr, 0, group); +} + +template +struct Namespace { + Local object; + Namespace (Isolate *isolate) { + object = Object::New(isolate); + NODE_SET_METHOD(object, "send", send); + NODE_SET_METHOD(object, "close", closeSocket); + NODE_SET_METHOD(object, "terminate", terminateSocket); + NODE_SET_METHOD(object, "prepareMessage", prepareMessage); + NODE_SET_METHOD(object, "sendPrepared", sendPrepared); + NODE_SET_METHOD(object, "finalizeMessage", finalizeMessage); + + Local group = Object::New(isolate); + NODE_SET_METHOD(group, "onConnection", onConnection); + NODE_SET_METHOD(group, "onMessage", onMessage); + NODE_SET_METHOD(group, "onDisconnection", onDisconnection); + + if (!isServer) { + NODE_SET_METHOD(group, "onError", onError); + } else { + NODE_SET_METHOD(group, "forEach", forEach); + NODE_SET_METHOD(group, "getSize", getSize); + NODE_SET_METHOD(group, "startAutoPing", startAutoPing); + NODE_SET_METHOD(group, "listen", listen); + } + + NODE_SET_METHOD(group, "onPing", onPing); + NODE_SET_METHOD(group, "onPong", onPong); + NODE_SET_METHOD(group, "create", createGroup); + NODE_SET_METHOD(group, "delete", deleteGroup); + NODE_SET_METHOD(group, "close", closeGroup); + NODE_SET_METHOD(group, "terminate", terminateGroup); + NODE_SET_METHOD(group, "broadcast", broadcast); + + object->Set(String::NewFromUtf8(isolate, "group"), group); + } +}; diff --git a/node_modules/uws/src/http.h b/node_modules/uws/src/http.h new file mode 100644 index 0000000..61c9d2e --- /dev/null +++ b/node_modules/uws/src/http.h @@ -0,0 +1,357 @@ +#include + +Persistent reqTemplate, resTemplate; +Persistent httpPersistent; + +uWS::HttpRequest *currentReq = nullptr; + +struct HttpServer { + + struct Request { + static void on(const FunctionCallbackInfo &args) { + NativeString eventName(args[0]); + if (std::string(eventName.getData(), eventName.getLength()) == "data") { + args.Holder()->SetInternalField(1, args[1]); + } else if (std::string(eventName.getData(), eventName.getLength()) == "end") { + args.Holder()->SetInternalField(2, args[1]); + } else { + std::cout << "Warning: req.on(" << std::string(eventName.getData(), eventName.getLength()) << ") is not implemented!" << std::endl; + } + args.GetReturnValue().Set(args.Holder()); + } + + static void headers(Local property, const PropertyCallbackInfo &args) { + uWS::HttpRequest *req = currentReq; + if (!req) { + std::cerr << "Warning: req.headers usage past request handler is not supported!" << std::endl; + } else { + NativeString nativeString(property); + uWS::Header header = req->getHeader(nativeString.getData(), nativeString.getLength()); + if (header) { + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) header.value, String::kNormalString, header.valueLength)); + } + } + } + + static void url(Local property, const PropertyCallbackInfo &args) { + args.GetReturnValue().Set(args.This()->GetInternalField(4)); + } + + static void method(Local property, const PropertyCallbackInfo &args) { + //std::cout << "method" << std::endl; + long methodId = ((long) args.This()->GetAlignedPointerFromInternalField(3)) >> 1; + switch (methodId) { + case uWS::HttpMethod::METHOD_GET: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "GET", String::kNormalString, 3)); + break; + case uWS::HttpMethod::METHOD_PUT: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "PUT", String::kNormalString, 3)); + break; + case uWS::HttpMethod::METHOD_POST: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "POST", String::kNormalString, 4)); + break; + case uWS::HttpMethod::METHOD_HEAD: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "HEAD", String::kNormalString, 4)); + break; + case uWS::HttpMethod::METHOD_PATCH: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "PATCH", String::kNormalString, 5)); + break; + case uWS::HttpMethod::METHOD_TRACE: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "TRACE", String::kNormalString, 5)); + break; + case uWS::HttpMethod::METHOD_DELETE: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "DELETE", String::kNormalString, 6)); + break; + case uWS::HttpMethod::METHOD_OPTIONS: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "OPTIONS", String::kNormalString, 7)); + break; + case uWS::HttpMethod::METHOD_CONNECT: + args.GetReturnValue().Set(String::NewFromOneByte(args.GetIsolate(), (uint8_t *) "CONNECT", String::kNormalString, 7)); + break; + } + } + + // placeholders + static void unpipe(const FunctionCallbackInfo &args) { + //std::cout << "req.unpipe called" << std::endl; + } + + static void resume(const FunctionCallbackInfo &args) { + //std::cout << "req.resume called" << std::endl; + } + + static void socket(const FunctionCallbackInfo &args) { + // return new empty object + args.GetReturnValue().Set(Object::New(args.GetIsolate())); + } + + static Local getTemplateObject(Isolate *isolate) { + Local reqTemplateLocal = FunctionTemplate::New(isolate); + reqTemplateLocal->SetClassName(String::NewFromUtf8(isolate, "uws.Request")); + reqTemplateLocal->InstanceTemplate()->SetInternalFieldCount(5); + reqTemplateLocal->PrototypeTemplate()->SetAccessor(String::NewFromUtf8(isolate, "url"), Request::url); + reqTemplateLocal->PrototypeTemplate()->SetAccessor(String::NewFromUtf8(isolate, "method"), Request::method); + reqTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "on"), FunctionTemplate::New(isolate, Request::on)); + reqTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "unpipe"), FunctionTemplate::New(isolate, Request::unpipe)); + reqTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "resume"), FunctionTemplate::New(isolate, Request::resume)); + reqTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "socket"), FunctionTemplate::New(isolate, Request::socket)); + + Local reqObjectLocal = reqTemplateLocal->GetFunction()->NewInstance(); + + Local headersTemplate = ObjectTemplate::New(isolate); + headersTemplate->SetNamedPropertyHandler(Request::headers); + + reqObjectLocal->Set(String::NewFromUtf8(isolate, "headers"), headersTemplate->NewInstance()); + return reqObjectLocal; + } + }; + + struct Response { + static void on(const FunctionCallbackInfo &args) { + NativeString eventName(args[0]); + if (std::string(eventName.getData(), eventName.getLength()) == "close") { + args.Holder()->SetInternalField(1, args[1]); + } else { + std::cout << "Warning: res.on(" << std::string(eventName.getData(), eventName.getLength()) << ") is not implemented!" << std::endl; + } + args.GetReturnValue().Set(args.Holder()); + } + + static void end(const FunctionCallbackInfo &args) { + uWS::HttpResponse *res = (uWS::HttpResponse *) args.Holder()->GetAlignedPointerFromInternalField(0); + if (res) { + NativeString nativeString(args[0]); + + ((Persistent *) &res->userData)->Reset(); + ((Persistent *) &res->userData)->~Persistent(); + ((Persistent *) &res->extraUserData)->Reset(); + ((Persistent *) &res->extraUserData)->~Persistent(); + res->end(nativeString.getData(), nativeString.getLength()); + } + } + + // todo: this is slow + static void writeHead(const FunctionCallbackInfo &args) { + uWS::HttpResponse *res = (uWS::HttpResponse *) args.Holder()->GetAlignedPointerFromInternalField(0); + if (res) { + std::string head = "HTTP/1.1 " + std::to_string(args[0]->IntegerValue()) + " "; + + if (args.Length() > 1 && args[1]->IsString()) { + NativeString statusMessage(args[1]); + head.append(statusMessage.getData(), statusMessage.getLength()); + } else { + head += "OK"; + } + + if (args[args.Length() - 1]->IsObject()) { + Local headersObject = args[args.Length() - 1]->ToObject(); + Local headers = headersObject->GetOwnPropertyNames(); + for (int i = 0; i < headers->Length(); i++) { + Local key = headers->Get(i); + Local value = headersObject->Get(key); + + NativeString nativeKey(key); + NativeString nativeValue(value); + + head += "\r\n"; + head.append(nativeKey.getData(), nativeKey.getLength()); + head += ": "; + head.append(nativeValue.getData(), nativeValue.getLength()); + } + } + + head += "\r\n\r\n"; + res->write(head.data(), head.length()); + } + } + + // todo: if not writeHead called before then should write implicit headers + static void write(const FunctionCallbackInfo &args) { + uWS::HttpResponse *res = (uWS::HttpResponse *) args.Holder()->GetAlignedPointerFromInternalField(0); + + if (res) { + NativeString nativeString(args[0]); + res->write(nativeString.getData(), nativeString.getLength()); + } + } + + static void setHeader(const FunctionCallbackInfo &args) { + //std::cout << "res.setHeader called" << std::endl; + } + + static void getHeader(const FunctionCallbackInfo &args) { + //std::cout << "res.getHeader called" << std::endl; + } + + static Local getTemplateObject(Isolate *isolate) { + Local resTemplateLocal = FunctionTemplate::New(isolate); + resTemplateLocal->SetClassName(String::NewFromUtf8(isolate, "uws.Response")); + resTemplateLocal->InstanceTemplate()->SetInternalFieldCount(5); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "end"), FunctionTemplate::New(isolate, Response::end)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "writeHead"), FunctionTemplate::New(isolate, Response::writeHead)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "write"), FunctionTemplate::New(isolate, Response::write)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "on"), FunctionTemplate::New(isolate, Response::on)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "setHeader"), FunctionTemplate::New(isolate, Response::setHeader)); + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getHeader"), FunctionTemplate::New(isolate, Response::getHeader)); + return resTemplateLocal->GetFunction()->NewInstance(); + } + }; + + // todo: wrap everything up - most important function to get correct + static void createServer(const FunctionCallbackInfo &args) { + + // todo: delete this on destructor + uWS::Group *group = hub.createGroup(); + group->setUserData(new GroupData); + GroupData *groupData = (GroupData *) group->getUserData(); + + Isolate *isolate = args.GetIsolate(); + Persistent *httpRequestCallback = &groupData->httpRequestHandler; + httpRequestCallback->Reset(isolate, Local::Cast(args[0])); + group->onHttpRequest([isolate, httpRequestCallback](uWS::HttpResponse *res, uWS::HttpRequest req, char *data, size_t length, size_t remainingBytes) { + HandleScope hs(isolate); + + currentReq = &req; + + Local reqObject = Local::New(isolate, reqTemplate)->Clone(); + reqObject->SetAlignedPointerInInternalField(0, &req); + new (&res->extraUserData) Persistent(isolate, reqObject); + + Local resObject = Local::New(isolate, resTemplate)->Clone(); + resObject->SetAlignedPointerInInternalField(0, res); + new (&res->userData) Persistent(isolate, resObject); + + // store url & method (needed by Koa and Express) + long methodId = req.getMethod() << 1; + reqObject->SetAlignedPointerInInternalField(3, (void *) methodId); + reqObject->SetInternalField(4, String::NewFromOneByte(isolate, (uint8_t *) req.getUrl().value, String::kNormalString, req.getUrl().valueLength)); + + Local argv[] = {reqObject, resObject}; + Local::New(isolate, *httpRequestCallback)->Call(isolate->GetCurrentContext()->Global(), 2, argv); + + if (length) { + Local dataCallback = reqObject->GetInternalField(1); + if (!dataCallback->IsUndefined()) { + Local argv[] = {ArrayBuffer::New(isolate, data, length)}; + Local::Cast(dataCallback)->Call(isolate->GetCurrentContext()->Global(), 1, argv); + } + + if (!remainingBytes) { + Local endCallback = reqObject->GetInternalField(2); + if (!endCallback->IsUndefined()) { + Local::Cast(endCallback)->Call(isolate->GetCurrentContext()->Global(), 0, nullptr); + } + } + } + + currentReq = nullptr; + reqObject->SetAlignedPointerInInternalField(0, nullptr); + }); + + group->onCancelledHttpRequest([isolate](uWS::HttpResponse *res) { + HandleScope hs(isolate); + + // mark res as invalid + Local resObject = Local::New(isolate, *(Persistent *) &res->userData); + resObject->SetAlignedPointerInInternalField(0, nullptr); + + // mark req as invalid + Local reqObject = Local::New(isolate, *(Persistent *) &res->extraUserData); + reqObject->SetAlignedPointerInInternalField(0, nullptr); + + // emit res 'close' on aborted response + Local closeCallback = resObject->GetInternalField(1); + if (!closeCallback->IsUndefined()) { + Local::Cast(closeCallback)->Call(isolate->GetCurrentContext()->Global(), 0, nullptr); + } + + ((Persistent *) &res->userData)->Reset(); + ((Persistent *) &res->userData)->~Persistent(); + ((Persistent *) &res->extraUserData)->Reset(); + ((Persistent *) &res->extraUserData)->~Persistent(); + }); + + group->onHttpData([isolate](uWS::HttpResponse *res, char *data, size_t length, size_t remainingBytes) { + Local reqObject = Local::New(isolate, *(Persistent *) res->extraUserData); + + Local dataCallback = reqObject->GetInternalField(1); + if (!dataCallback->IsUndefined()) { + Local argv[] = {ArrayBuffer::New(isolate, data, length)}; + Local::Cast(dataCallback)->Call(isolate->GetCurrentContext()->Global(), 1, argv); + } + + if (!remainingBytes) { + Local endCallback = reqObject->GetInternalField(2); + if (!endCallback->IsUndefined()) { + Local::Cast(endCallback)->Call(isolate->GetCurrentContext()->Global(), 0, nullptr); + } + } + }); + + Local newInstance; + if (!args.IsConstructCall()) { + args.GetReturnValue().Set(newInstance = Local::New(args.GetIsolate(), httpPersistent)->NewInstance()); + } else { + args.GetReturnValue().Set(newInstance = args.This()); + } + + newInstance->SetAlignedPointerInInternalField(0, group); + } + + static void on(const FunctionCallbackInfo &args) { + NativeString eventName(args[0]); + std::cout << "Warning: server.on(" << std::string(eventName.getData(), eventName.getLength()) << ") is not implemented!" << std::endl; + } + + static void listen(const FunctionCallbackInfo &args) { + uWS::Group *group = (uWS::Group *) args.Holder()->GetAlignedPointerFromInternalField(0); + std::cout << "listen: " << hub.listen(args[0]->IntegerValue(), nullptr, 0, group) << std::endl; + + if (args[args.Length() - 1]->IsFunction()) { + Local::Cast(args[args.Length() - 1])->Call(args.GetIsolate()->GetCurrentContext()->Global(), 0, nullptr); + } + } + + // var app = getExpressApp(express) + static void getExpressApp(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + if (args[0]->IsFunction()) { + Local express = Local::Cast(args[0]); + express->Get(String::NewFromUtf8(isolate, "request"))->ToObject()->SetPrototype(Local::New(args.GetIsolate(), reqTemplate)->GetPrototype()); + express->Get(String::NewFromUtf8(isolate, "response"))->ToObject()->SetPrototype(Local::New(args.GetIsolate(), resTemplate)->GetPrototype()); + + // also change app.listen? + + // change prototypes back? + + args.GetReturnValue().Set(express->NewInstance()); + } + } + + static void getResponsePrototype(const FunctionCallbackInfo &args) { + args.GetReturnValue().Set(Local::New(args.GetIsolate(), resTemplate)->GetPrototype()); + } + + static void getRequestPrototype(const FunctionCallbackInfo &args) { + args.GetReturnValue().Set(Local::New(args.GetIsolate(), reqTemplate)->GetPrototype()); + } + + static Local getHttpServer(Isolate *isolate) { + Local httpServer = FunctionTemplate::New(isolate, HttpServer::createServer); + httpServer->InstanceTemplate()->SetInternalFieldCount(1); + + httpServer->Set(String::NewFromUtf8(isolate, "createServer"), FunctionTemplate::New(isolate, HttpServer::createServer)); + httpServer->Set(String::NewFromUtf8(isolate, "getExpressApp"), FunctionTemplate::New(isolate, HttpServer::getExpressApp)); + httpServer->Set(String::NewFromUtf8(isolate, "getResponsePrototype"), FunctionTemplate::New(isolate, HttpServer::getResponsePrototype)); + httpServer->Set(String::NewFromUtf8(isolate, "getRequestPrototype"), FunctionTemplate::New(isolate, HttpServer::getRequestPrototype)); + httpServer->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "listen"), FunctionTemplate::New(isolate, HttpServer::listen)); + httpServer->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "on"), FunctionTemplate::New(isolate, HttpServer::on)); + + reqTemplate.Reset(isolate, Request::getTemplateObject(isolate)); + resTemplate.Reset(isolate, Response::getTemplateObject(isolate)); + + Local httpServerLocal = httpServer->GetFunction(); + httpPersistent.Reset(isolate, httpServerLocal); + return httpServerLocal; + } +}; diff --git a/node_modules/uws/src/uWS.h b/node_modules/uws/src/uWS.h new file mode 100644 index 0000000..40a0e40 --- /dev/null +++ b/node_modules/uws/src/uWS.h @@ -0,0 +1,6 @@ +#ifndef UWS_UWS_H +#define UWS_UWS_H + +#include "Hub.h" + +#endif // UWS_UWS_H -- cgit v1.2.3