From 67fdec20726e48ba3a934cb25bb30d47ec4a4f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaroslav=20De=20La=20Pe=C3=B1a=20Smirnov?= Date: Wed, 29 Nov 2017 11:44:34 +0300 Subject: Initial commit, version 0.5.3 --- node_modules/uws/src/Group.cpp | 263 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 node_modules/uws/src/Group.cpp (limited to 'node_modules/uws/src/Group.cpp') diff --git a/node_modules/uws/src/Group.cpp b/node_modules/uws/src/Group.cpp new file mode 100644 index 0000000..028b1a0 --- /dev/null +++ b/node_modules/uws/src/Group.cpp @@ -0,0 +1,263 @@ +#include "Group.h" +#include "Hub.h" + +namespace uWS { + +template +void Group::setUserData(void *user) { + this->userData = user; +} + +template +void *Group::getUserData() { + return userData; +} + +template +void Group::timerCallback(Timer *timer) { + Group *group = (Group *) timer->getData(); + + group->forEach([](uWS::WebSocket *webSocket) { + if (webSocket->hasOutstandingPong) { + webSocket->terminate(); + } else { + webSocket->hasOutstandingPong = true; + } + }); + + if (group->userPingMessage.length()) { + group->broadcast(group->userPingMessage.data(), group->userPingMessage.length(), OpCode::TEXT); + } else { + group->broadcast(nullptr, 0, OpCode::PING); + } +} + +template +void Group::startAutoPing(int intervalMs, std::string userMessage) { + timer = new Timer(loop); + timer->setData(this); + timer->start(timerCallback, intervalMs, intervalMs); + userPingMessage = userMessage; +} + +template +void Group::addHttpSocket(HttpSocket *httpSocket) { + if (httpSocketHead) { + httpSocketHead->prev = httpSocket; + httpSocket->next = httpSocketHead; + } else { + httpSocket->next = nullptr; + // start timer + httpTimer = new Timer(hub->getLoop()); + httpTimer->setData(this); + httpTimer->start([](Timer *httpTimer) { + Group *group = (Group *) httpTimer->getData(); + group->forEachHttpSocket([](HttpSocket *httpSocket) { + if (httpSocket->missedDeadline) { + httpSocket->terminate(); + } else if (!httpSocket->outstandingResponsesHead) { + httpSocket->missedDeadline = true; + } + }); + }, 1000, 1000); + } + httpSocketHead = httpSocket; + httpSocket->prev = nullptr; +} + +template +void Group::removeHttpSocket(HttpSocket *httpSocket) { + if (iterators.size()) { + iterators.top() = httpSocket->next; + } + if (httpSocket->prev == httpSocket->next) { + httpSocketHead = nullptr; + httpTimer->stop(); + httpTimer->close(); + } else { + if (httpSocket->prev) { + ((HttpSocket *) httpSocket->prev)->next = httpSocket->next; + } else { + httpSocketHead = (HttpSocket *) httpSocket->next; + } + if (httpSocket->next) { + ((HttpSocket *) httpSocket->next)->prev = httpSocket->prev; + } + } +} + +template +void Group::addWebSocket(WebSocket *webSocket) { + if (webSocketHead) { + webSocketHead->prev = webSocket; + webSocket->next = webSocketHead; + } else { + webSocket->next = nullptr; + } + webSocketHead = webSocket; + webSocket->prev = nullptr; +} + +template +void Group::removeWebSocket(WebSocket *webSocket) { + if (iterators.size()) { + iterators.top() = webSocket->next; + } + if (webSocket->prev == webSocket->next) { + webSocketHead = nullptr; + } else { + if (webSocket->prev) { + ((WebSocket *) webSocket->prev)->next = webSocket->next; + } else { + webSocketHead = (WebSocket *) webSocket->next; + } + if (webSocket->next) { + ((WebSocket *) webSocket->next)->prev = webSocket->prev; + } + } +} + +template +Group::Group(int extensionOptions, unsigned int maxPayload, Hub *hub, uS::NodeData *nodeData) : uS::NodeData(*nodeData), maxPayload(maxPayload), hub(hub), extensionOptions(extensionOptions) { + connectionHandler = [](WebSocket *, HttpRequest) {}; + transferHandler = [](WebSocket *) {}; + messageHandler = [](WebSocket *, char *, size_t, OpCode) {}; + disconnectionHandler = [](WebSocket *, int, char *, size_t) {}; + pingHandler = pongHandler = [](WebSocket *, char *, size_t) {}; + errorHandler = [](errorType) {}; + httpRequestHandler = [](HttpResponse *, HttpRequest, char *, size_t, size_t) {}; + httpConnectionHandler = [](HttpSocket *) {}; + httpDisconnectionHandler = [](HttpSocket *) {}; + httpCancelledRequestHandler = [](HttpResponse *) {}; + httpDataHandler = [](HttpResponse *, char *, size_t, size_t) {}; + + this->extensionOptions |= CLIENT_NO_CONTEXT_TAKEOVER | SERVER_NO_CONTEXT_TAKEOVER; +} + +template +void Group::stopListening() { + if (isServer) { + if (user) { + // todo: we should allow one group to listen to many ports! + uS::ListenSocket *listenSocket = (uS::ListenSocket *) user; + + if (listenSocket->timer) { + listenSocket->timer->stop(); + listenSocket->timer->close(); + } + + listenSocket->closeSocket(); + + // mark as stopped listening (extra care?) + user = nullptr; + } + } + + if (async) { + async->close(); + } +} + +template +void Group::onConnection(std::function *, HttpRequest)> handler) { + connectionHandler = handler; +} + +template +void Group::onTransfer(std::function *)> handler) { + transferHandler = handler; +} + +template +void Group::onMessage(std::function *, char *, size_t, OpCode)> handler) { + messageHandler = handler; +} + +template +void Group::onDisconnection(std::function *, int, char *, size_t)> handler) { + disconnectionHandler = handler; +} + +template +void Group::onPing(std::function *, char *, size_t)> handler) { + pingHandler = handler; +} + +template +void Group::onPong(std::function *, char *, size_t)> handler) { + pongHandler = handler; +} + +template +void Group::onError(std::function handler) { + errorHandler = handler; +} + +template +void Group::onHttpConnection(std::function *)> handler) { + httpConnectionHandler = handler; +} + +template +void Group::onHttpRequest(std::function handler) { + httpRequestHandler = handler; +} + +template +void Group::onHttpData(std::function handler) { + httpDataHandler = handler; +} + +template +void Group::onHttpDisconnection(std::function *)> handler) { + httpDisconnectionHandler = handler; +} + +template +void Group::onCancelledHttpRequest(std::function handler) { + httpCancelledRequestHandler = handler; +} + +template +void Group::onHttpUpgrade(std::function *, HttpRequest)> handler) { + httpUpgradeHandler = handler; +} + +template +void Group::broadcast(const char *message, size_t length, OpCode opCode) { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*asyncMutex); +#endif + + typename WebSocket::PreparedMessage *preparedMessage = WebSocket::prepareMessage((char *) message, length, opCode, false); + forEach([preparedMessage](uWS::WebSocket *ws) { + ws->sendPrepared(preparedMessage); + }); + WebSocket::finalizeMessage(preparedMessage); +} + +template +void Group::terminate() { + forEach([](uWS::WebSocket *ws) { + ws->terminate(); + }); + stopListening(); +} + +template +void Group::close(int code, char *message, size_t length) { + forEach([code, message, length](uWS::WebSocket *ws) { + ws->close(code, message, length); + }); + stopListening(); + if (timer) { + timer->stop(); + timer->close(); + } +} + +template struct Group; +template struct Group; + +} -- cgit v1.2.3