From 67fdec20726e48ba3a934cb25bb30d47ec4a4f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yaroslav=20De=20La=20Pe=C3=B1a=20Smirnov?= Date: Wed, 29 Nov 2017 11:44:34 +0300 Subject: Initial commit, version 0.5.3 --- node_modules/uws/src/WebSocket.cpp | 405 +++++++++++++++++++++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 node_modules/uws/src/WebSocket.cpp (limited to 'node_modules/uws/src/WebSocket.cpp') diff --git a/node_modules/uws/src/WebSocket.cpp b/node_modules/uws/src/WebSocket.cpp new file mode 100644 index 0000000..89ac23a --- /dev/null +++ b/node_modules/uws/src/WebSocket.cpp @@ -0,0 +1,405 @@ +#include "WebSocket.h" +#include "Group.h" +#include "Hub.h" + +namespace uWS { + +/* + * Frames and sends a WebSocket message. + * + * Hints: Consider using any of the prepare function if any of their + * use cases match what you are trying to achieve (pub/sub, broadcast) + * + * Thread safe + * + */ +template +void WebSocket::send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved), void *callbackData) { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*nodeData->asyncMutex); + if (isClosed()) { + if (callback) { + callback(this, callbackData, true, nullptr); + } + return; + } +#endif + + const int HEADER_LENGTH = WebSocketProtocol>::LONG_MESSAGE_HEADER; + + struct TransformData { + OpCode opCode; + } transformData = {opCode}; + + struct WebSocketTransformer { + static size_t estimate(const char *data, size_t length) { + return length + HEADER_LENGTH; + } + + static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) { + return WebSocketProtocol>::formatMessage(dst, src, length, transformData.opCode, length, false); + } + }; + + sendTransformed((char *) message, length, (void(*)(void *, void *, bool, void *)) callback, callbackData, transformData); +} + +/* + * Prepares a single message for use with sendPrepared. + * + * Hints: Useful in cases where you need to send the same message to many + * recipients. Do not use when only sending one message. + * + * Thread safe + * + */ +template +typename WebSocket::PreparedMessage *WebSocket::prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved)) { + PreparedMessage *preparedMessage = new PreparedMessage; + preparedMessage->buffer = new char[length + 10]; + preparedMessage->length = WebSocketProtocol>::formatMessage(preparedMessage->buffer, data, length, opCode, length, compressed); + preparedMessage->references = 1; + preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback; + return preparedMessage; +} + +/* + * Prepares a batch of messages to send as one single TCP packet / syscall. + * + * Hints: Useful when doing pub/sub-like broadcasts where many recipients should receive many + * messages. Do not use if only sending one message. + * + * Thread safe + * + */ +template +typename WebSocket::PreparedMessage *WebSocket::prepareMessageBatch(std::vector &messages, std::vector &excludedMessages, OpCode opCode, bool compressed, void (*callback)(WebSocket *, void *, bool, void *)) +{ + // should be sent in! + size_t batchLength = 0; + for (size_t i = 0; i < messages.size(); i++) { + batchLength += messages[i].length(); + } + + PreparedMessage *preparedMessage = new PreparedMessage; + preparedMessage->buffer = new char[batchLength + 10 * messages.size()]; + + int offset = 0; + for (size_t i = 0; i < messages.size(); i++) { + offset += WebSocketProtocol>::formatMessage(preparedMessage->buffer + offset, messages[i].data(), messages[i].length(), opCode, messages[i].length(), compressed); + } + preparedMessage->length = offset; + preparedMessage->references = 1; + preparedMessage->callback = (void(*)(void *, void *, bool, void *)) callback; + return preparedMessage; +} + +/* + * Sends a prepared message. + * + * Hints: Used to improve broadcasting and similar use cases where the same + * message is sent to multiple recipients. Do not used if only sending one message + * in total. + * + * Warning: Modifies passed PreparedMessage and is thus not thread safe. Other + * data is also modified and it makes sense to not make this function thread-safe + * since it is a central part in broadcasting and other high-perf code paths. + * + */ +template +void WebSocket::sendPrepared(typename WebSocket::PreparedMessage *preparedMessage, void *callbackData) { + // todo: see if this can be made a transformer instead + preparedMessage->references++; + void (*callback)(void *webSocket, void *userData, bool cancelled, void *reserved) = [](void *webSocket, void *userData, bool cancelled, void *reserved) { + PreparedMessage *preparedMessage = (PreparedMessage *) userData; + bool lastReference = !--preparedMessage->references; + + if (preparedMessage->callback) { + preparedMessage->callback(webSocket, reserved, cancelled, (void *) lastReference); + } + + if (lastReference) { + delete [] preparedMessage->buffer; + delete preparedMessage; + } + }; + + // candidate for fixed size pool allocator + int memoryLength = sizeof(Queue::Message); + int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength); + + Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex); + messagePtr->data = preparedMessage->buffer; + messagePtr->length = preparedMessage->length; + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, preparedMessage, false, callbackData); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = preparedMessage; + messagePtr->reserved = callbackData; + } + } else { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, preparedMessage, true, callbackData); + } + } +} + +/* + * Decrements the reference count of passed PreparedMessage. On zero references + * the memory will be deleted. + * + * Hints: Used together with prepareMessage, prepareMessageBatch and similar calls. + * + * Warning: Will modify passed PrepareMessage and is thus not thread safe by itself. + * + */ +template +void WebSocket::finalizeMessage(typename WebSocket::PreparedMessage *preparedMessage) { + if (!--preparedMessage->references) { + delete [] preparedMessage->buffer; + delete preparedMessage; + } +} + +template +uS::Socket *WebSocket::onData(uS::Socket *s, char *data, size_t length) { + WebSocket *webSocket = static_cast *>(s); + + webSocket->hasOutstandingPong = false; + if (!webSocket->isShuttingDown()) { + webSocket->cork(true); + WebSocketProtocol>::consume(data, length, webSocket); + if (!webSocket->isClosed()) { + webSocket->cork(false); + } + } + + return webSocket; +} + +/* + * Immediately terminates this WebSocket. Will call onDisconnection of its Group. + * + * Hints: Close code will be 1006 and message will be empty. + * + */ +template +void WebSocket::terminate() { + +#ifdef UWS_THREADSAFE + std::lock_guard lockGuard(*nodeData->asyncMutex); + if (isClosed()) { + return; + } +#endif + + WebSocket::onEnd(this); +} + +/* + * Transfers this WebSocket from its current Group to specified Group. + * + * Receiving Group has to have called listen(uWS::TRANSFERS) prior. + * + * Hints: Useful to implement subprotocols on the same thread and Loop + * or to transfer WebSockets between threads at any point (dynamic load balancing). + * + * Warning: From the point of call to the point of onTransfer, this WebSocket + * is invalid and cannot be used. What you put in is not guaranteed to be what you + * get in onTransfer, the only guaranteed consistency is passed userData is the userData + * of given WebSocket in onTransfer. Use setUserData and getUserData to identify the WebSocket. + */ +template +void WebSocket::transfer(Group *group) { + Group::from(this)->removeWebSocket(this); + if (group->loop == Group::from(this)->loop) { + // fast path + this->nodeData = group; + Group::from(this)->addWebSocket(this); + Group::from(this)->transferHandler(this); + } else { + // slow path + uS::Socket::transfer((uS::NodeData *) group, [](Poll *p) { + WebSocket *webSocket = (WebSocket *) p; + Group::from(webSocket)->addWebSocket(webSocket); + Group::from(webSocket)->transferHandler(webSocket); + }); + } +} + +/* + * Immediately calls onDisconnection of its Group and begins a passive + * WebSocket closedown handshake in the background (might succeed or not, + * we don't care). + * + * Hints: Close code and message will be what you pass yourself. + * + */ +template +void WebSocket::close(int code, const char *message, size_t length) { + + // startTimeout is not thread safe + + static const int MAX_CLOSE_PAYLOAD = 123; + length = std::min(MAX_CLOSE_PAYLOAD, length); + Group::from(this)->removeWebSocket(this); + Group::from(this)->disconnectionHandler(this, code, (char *) message, length); + setShuttingDown(true); + + // todo: using the shared timer in the group, we can skip creating a new timer per socket + // only this line and the one in Hub::connect uses the timeout feature + startTimeout::onEnd>(); + + char closePayload[MAX_CLOSE_PAYLOAD + 2]; + int closePayloadLength = WebSocketProtocol>::formatClosePayload(closePayload, code, message, length); + send(closePayload, closePayloadLength, OpCode::CLOSE, [](WebSocket *p, void *data, bool cancelled, void *reserved) { + if (!cancelled) { + p->shutdown(); + } + }); +} + +template +void WebSocket::onEnd(uS::Socket *s) { + WebSocket *webSocket = static_cast *>(s); + + if (!webSocket->isShuttingDown()) { + Group::from(webSocket)->removeWebSocket(webSocket); + Group::from(webSocket)->disconnectionHandler(webSocket, 1006, nullptr, 0); + } else { + webSocket->cancelTimeout(); + } + + webSocket->template closeSocket>(); + + while (!webSocket->messageQueue.empty()) { + Queue::Message *message = webSocket->messageQueue.front(); + if (message->callback) { + message->callback(nullptr, message->callbackData, true, nullptr); + } + webSocket->messageQueue.pop(); + } + + webSocket->nodeData->clearPendingPollChanges(webSocket); +} + +template +bool WebSocket::handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + Group *group = Group::from(webSocket); + + if (opCode < 3) { + if (!remainingBytes && fin && !webSocket->fragmentBuffer.length()) { + if (webSocket->compressionStatus == WebSocket::CompressionStatus::COMPRESSED_FRAME) { + webSocket->compressionStatus = WebSocket::CompressionStatus::ENABLED; + data = group->hub->inflate(data, length, group->maxPayload); + if (!data) { + forceClose(webSocketState); + return true; + } + } + + if (opCode == 1 && !WebSocketProtocol>::isValidUtf8((unsigned char *) data, length)) { + forceClose(webSocketState); + return true; + } + + group->messageHandler(webSocket, data, length, (OpCode) opCode); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else { + webSocket->fragmentBuffer.append(data, length); + if (!remainingBytes && fin) { + length = webSocket->fragmentBuffer.length(); + if (webSocket->compressionStatus == WebSocket::CompressionStatus::COMPRESSED_FRAME) { + webSocket->compressionStatus = WebSocket::CompressionStatus::ENABLED; + webSocket->fragmentBuffer.append("...."); + data = group->hub->inflate((char *) webSocket->fragmentBuffer.data(), length, group->maxPayload); + if (!data) { + forceClose(webSocketState); + return true; + } + } else { + data = (char *) webSocket->fragmentBuffer.data(); + } + + if (opCode == 1 && !WebSocketProtocol>::isValidUtf8((unsigned char *) data, length)) { + forceClose(webSocketState); + return true; + } + + group->messageHandler(webSocket, data, length, (OpCode) opCode); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + webSocket->fragmentBuffer.clear(); + } + } + } else { + if (!remainingBytes && fin && !webSocket->controlTipLength) { + if (opCode == CLOSE) { + typename WebSocketProtocol>::CloseFrame closeFrame = WebSocketProtocol>::parseClosePayload(data, length); + webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length); + return true; + } else { + if (opCode == PING) { + webSocket->send(data, length, (OpCode) OpCode::PONG); + group->pingHandler(webSocket, data, length); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else if (opCode == PONG) { + group->pongHandler(webSocket, data, length); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } + } + } else { + webSocket->fragmentBuffer.append(data, length); + webSocket->controlTipLength += length; + + if (!remainingBytes && fin) { + char *controlBuffer = (char *) webSocket->fragmentBuffer.data() + webSocket->fragmentBuffer.length() - webSocket->controlTipLength; + if (opCode == CLOSE) { + typename WebSocketProtocol>::CloseFrame closeFrame = WebSocketProtocol>::parseClosePayload(controlBuffer, webSocket->controlTipLength); + webSocket->close(closeFrame.code, closeFrame.message, closeFrame.length); + return true; + } else { + if (opCode == PING) { + webSocket->send(controlBuffer, webSocket->controlTipLength, (OpCode) OpCode::PONG); + group->pingHandler(webSocket, controlBuffer, webSocket->controlTipLength); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } else if (opCode == PONG) { + group->pongHandler(webSocket, controlBuffer, webSocket->controlTipLength); + if (webSocket->isClosed() || webSocket->isShuttingDown()) { + return true; + } + } + } + + webSocket->fragmentBuffer.resize(webSocket->fragmentBuffer.length() - webSocket->controlTipLength); + webSocket->controlTipLength = 0; + } + } + } + + return false; +} + +template struct WebSocket; +template struct WebSocket; + +} -- cgit v1.2.3