From 67fdec20726e48ba3a934cb25bb30d47ec4a4f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaroslav=20De=20La=20Pe=C3=B1a=20Smirnov?= Date: Wed, 29 Nov 2017 11:44:34 +0300 Subject: Initial commit, version 0.5.3 --- node_modules/uws/src/Hub.cpp | 177 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 node_modules/uws/src/Hub.cpp (limited to 'node_modules/uws/src/Hub.cpp') diff --git a/node_modules/uws/src/Hub.cpp b/node_modules/uws/src/Hub.cpp new file mode 100644 index 0000000..771c263 --- /dev/null +++ b/node_modules/uws/src/Hub.cpp @@ -0,0 +1,177 @@ +#include "Hub.h" +#include "HTTPSocket.h" +#include + +namespace uWS { + +char *Hub::inflate(char *data, size_t &length, size_t maxPayload) { + dynamicInflationBuffer.clear(); + + inflationStream.next_in = (Bytef *) data; + inflationStream.avail_in = length; + + int err; + do { + inflationStream.next_out = (Bytef *) inflationBuffer; + inflationStream.avail_out = LARGE_BUFFER_SIZE; + err = ::inflate(&inflationStream, Z_FINISH); + if (!inflationStream.avail_in) { + break; + } + + dynamicInflationBuffer.append(inflationBuffer, LARGE_BUFFER_SIZE - inflationStream.avail_out); + } while (err == Z_BUF_ERROR && dynamicInflationBuffer.length() <= maxPayload); + + inflateReset(&inflationStream); + + if ((err != Z_BUF_ERROR && err != Z_OK) || dynamicInflationBuffer.length() > maxPayload) { + length = 0; + return nullptr; + } + + if (dynamicInflationBuffer.length()) { + dynamicInflationBuffer.append(inflationBuffer, LARGE_BUFFER_SIZE - inflationStream.avail_out); + + length = dynamicInflationBuffer.length(); + return (char *) dynamicInflationBuffer.data(); + } + + length = LARGE_BUFFER_SIZE - inflationStream.avail_out; + return inflationBuffer; +} + +void Hub::onServerAccept(uS::Socket *s) { + HttpSocket *httpSocket = new HttpSocket(s); + delete s; + + httpSocket->setState>(); + httpSocket->start(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + httpSocket->setNoDelay(true); + Group::from(httpSocket)->addHttpSocket(httpSocket); + Group::from(httpSocket)->httpConnectionHandler(httpSocket); +} + +void Hub::onClientConnection(uS::Socket *s, bool error) { + HttpSocket *httpSocket = (HttpSocket *) s; + + if (error) { + httpSocket->onEnd(httpSocket); + } else { + httpSocket->setState>(); + httpSocket->change(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + httpSocket->setNoDelay(true); + httpSocket->upgrade(nullptr, nullptr, 0, nullptr, 0, nullptr); + } +} + +bool Hub::listen(const char *host, int port, uS::TLS::Context sslContext, int options, Group *eh) { + if (!eh) { + eh = (Group *) this; + } + + if (uS::Node::listen(host, port, sslContext, options, (uS::NodeData *) eh, nullptr)) { + eh->errorHandler(port); + return false; + } + return true; +} + +bool Hub::listen(int port, uS::TLS::Context sslContext, int options, Group *eh) { + return listen(nullptr, port, sslContext, options, eh); +} + +uS::Socket *allocateHttpSocket(uS::Socket *s) { + return (uS::Socket *) new HttpSocket(s); +} + +void Hub::connect(std::string uri, void *user, std::map extraHeaders, int timeoutMs, Group *eh) { + if (!eh) { + eh = (Group *) this; + } + + size_t offset = 0; + std::string protocol = uri.substr(offset, uri.find("://")), hostname, portStr, path; + if ((offset += protocol.length() + 3) < uri.length()) { + hostname = uri.substr(offset, uri.find_first_of(":/", offset) - offset); + + offset += hostname.length(); + if (uri[offset] == ':') { + offset++; + portStr = uri.substr(offset, uri.find("/", offset) - offset); + } + + offset += portStr.length(); + if (uri[offset] == '/') { + path = uri.substr(++offset); + } + } + + if (hostname.length()) { + int port = 80; + bool secure = false; + if (protocol == "wss") { + port = 443; + secure = true; + } else if (protocol != "ws") { + eh->errorHandler(user); + } + + if (portStr.length()) { + port = stoi(portStr); + } + + HttpSocket *httpSocket = (HttpSocket *) uS::Node::connect(hostname.c_str(), port, secure, eh); + if (httpSocket) { + // startTimeout occupies the user + httpSocket->startTimeout::onEnd>(timeoutMs); + httpSocket->httpUser = user; + + std::string randomKey = "x3JJHMbDL1EzLkh9GBhXDw=="; +// for (int i = 0; i < 22; i++) { +// randomKey[i] = rand() % +// } + + httpSocket->httpBuffer = "GET /" + path + " HTTP/1.1\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: " + randomKey + "\r\n" + "Host: " + hostname + "\r\n" + + "Sec-WebSocket-Version: 13\r\n"; + + for (std::pair header : extraHeaders) { + httpSocket->httpBuffer += header.first + ": " + header.second + "\r\n"; + } + + httpSocket->httpBuffer += "\r\n"; + } else { + eh->errorHandler(user); + } + } else { + eh->errorHandler(user); + } +} + +void Hub::upgrade(uv_os_sock_t fd, const char *secKey, SSL *ssl, const char *extensions, size_t extensionsLength, const char *subprotocol, size_t subprotocolLength, Group *serverGroup) { + if (!serverGroup) { + serverGroup = &getDefaultGroup(); + } + + uS::Socket s((uS::NodeData *) serverGroup, serverGroup->loop, fd, ssl); + s.setNoDelay(true); + + // todo: skip httpSocket -> it cannot fail anyways! + HttpSocket *httpSocket = new HttpSocket(&s); + httpSocket->setState>(); + httpSocket->change(httpSocket->nodeData->loop, httpSocket, httpSocket->setPoll(UV_READABLE)); + bool perMessageDeflate; + httpSocket->upgrade(secKey, extensions, extensionsLength, subprotocol, subprotocolLength, &perMessageDeflate); + + WebSocket *webSocket = new WebSocket(perMessageDeflate, httpSocket); + delete httpSocket; + webSocket->setState>(); + webSocket->change(webSocket->nodeData->loop, webSocket, webSocket->setPoll(UV_READABLE)); + serverGroup->addWebSocket(webSocket); + serverGroup->connectionHandler(webSocket, {}); +} + +} -- cgit v1.2.3