diff --git a/src/AsyncWSocket.cpp b/src/AsyncWSocket.cpp new file mode 100644 index 000000000..b4aa9674e --- /dev/null +++ b/src/AsyncWSocket.cpp @@ -0,0 +1,1041 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +// A new experimental implementation of Async WebSockets client/server + +// We target C++17 capable toolchain +#if __cplusplus >= 201703L +#include "AsyncWSocket.h" +#if defined(ESP32) +#if (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)) +#include "literals.h" + +#define WS_MAX_HEADER_SIZE 16 + +constexpr const char WS_STR_CONNECTION[] = "Connection"; +constexpr const char WS_STR_VERSION[] = "Sec-WebSocket-Version"; +constexpr const char WS_STR_KEY[] = "Sec-WebSocket-Key"; +constexpr const char WS_STR_PROTOCOL[] = "Sec-WebSocket-Protocol"; + +// WSockServer worker task +constexpr const char WS_SRV_TASK[] = "WSSrvtask"; + +// cast enum class to uint (for bit set) +template +constexpr std::common_type_t> +enum2uint32(E e) { + return static_cast>>(e); +} + + +/** + * @brief apply mask key to supplied data byte-per-byte + * this is used for unaligned portions of data buffer where 32 bit ops can't be applied + * this function takes mask offset to rollover the key bytes + * + * @param mask - mask key + * @param mask_offset - offset byte for mask + * @param data - data to apply + * @param length - data block length + */ +inline void wsMaskPayloadPerByte(uint32_t mask, size_t mask_offset, char *data, size_t length) { + for (char* ptr = data; ptr != data + length; ++ptr) { + *ptr ^= reinterpret_cast(&mask)[mask_offset++]; // roll mask bytes + if (mask_offset == sizeof(mask)) + mask_offset = 0; + } + } + +/** + * @brief apply mask key to supplied data using 32 bit XOR + * + * @param mask - mask key + * @param mask_offset - offset byte for mask + * @param data - data to apply + * @param length - data block length + */ +void wsMaskPayload(uint32_t mask, size_t mask_offset, char *data, size_t length) { + /* + we could benefit from 32-bit xor to unmask the data. The great thing of esp32 is that it could do unaligned 32 bit memory access + while some other MCU does not (yes, RP2040, I'm talking about you!) So have to go hard way - do all operations 32-bit aligned to cover all supported MCUs + */ + + // If data size is so small that it does not make sense to use 32 bit aligned calculations, just use byte-by-byte version + if (length < 4 * sizeof(mask)){ + wsMaskPayloadPerByte(mask, mask_offset % 4, data, length); + } else { + // do 32-bit vectored calculations + + // get unaligned part size + const size_t head_remainder = reinterpret_cast(data) % sizeof(mask); + // set aligned head + char* data_aligned_head = head_remainder == 0 ? data : (data + sizeof(mask) - head_remainder); + // set aligned tail + char* const data_end = data + length; + char* const data_aligned_end = data_end - reinterpret_cast(data_end) % sizeof(mask); + + // unmask unaligned part at the begining + if (head_remainder) + wsMaskPayloadPerByte(mask, mask_offset % 4, data, head_remainder); + + // need a derived mask key in a 32 bit var which is rolled over by the appropriate offset for data position, our byte-by-byte function could help here to derive key + uint32_t shifted_mask{0}; + wsMaskPayloadPerByte(mask, (mask_offset + data_aligned_head - data) % sizeof(mask), reinterpret_cast(&shifted_mask), sizeof(mask)); + + // (un)mask the payload + do { + *reinterpret_cast(data_aligned_head) ^= shifted_mask; + data_aligned_head += sizeof(mask); + } while(data_aligned_head != data_aligned_end); + + // unmask the unalined remainder + wsMaskPayloadPerByte(mask, (mask_offset + (data_aligned_end - data)) % sizeof(mask), data_aligned_end, data_end - data_aligned_end); + } +} + +size_t webSocketSendHeader(AsyncClient *client, WSMessageFrame& frame) { + if (!client) { + return 0; + } + + size_t headLen = 2; + if (frame.len > 65535){ + headLen += 8; + } else if (frame.len > 125) { + headLen += 2; + } + if (frame.len && frame.mask) { + headLen += 4; + } + + size_t space = client->space(); + if (space < headLen) { + // Serial.println("SF 2"); + return 0; + } + space -= headLen; + + // header buffer + uint8_t buf[headLen]; + + buf[0] = static_cast(frame.msg->type) & 0x0F; + if (frame.msg->final()) { + buf[0] |= 0x80; + } + if (frame.len < 126) { + // 1 byte len + buf[1] = frame.len & 0x7F; + } else if (frame.len > 65535){ + // 8 byte len + buf[1] = 127; + uint32_t lenl = htonl(frame.len & 0xffffffff); + uint32_t lenh = htonl(frame.len >> 32); + memcpy(buf+2, &lenh, sizeof(lenh)); + memcpy(buf+6, &lenl, sizeof(lenl)); + } else { + // 2 byte len + buf[1] = 126; + *(uint16_t*)(buf+2) = htons(frame.len & 0xffff); + } + + if (frame.len && frame.mask) { + buf[1] |= 0x80; + memcpy(buf + (headLen - sizeof(frame.mask)), &frame.mask, sizeof(frame.mask)); + } + + size_t sent = client->add((const char*)buf, headLen); + //log_d("send ws header, hdr size:%u, body len:%u", headLen, frame.len); + // return size of a header added or 0 if any error + return sent == headLen ? sent : 0; +} + + +// ******** WSocket classes implementation ******** + +WSocketClient::WSocketClient(uint32_t id, AsyncWebServerRequest *request, WSocketClient::event_cb_t call_back, size_t msgsize, size_t qcapsize) : + id(id), + _client(request->client()), + _cb(call_back), + _max_msgsize(msgsize), + _max_qcap(qcapsize) +{ + _lastPong = millis(); + // disable connection timeout + _client->setRxTimeout(0); + // disable Nagle's algo + _client->setNoDelay(true); + // set AsyncTCP callbacks + _client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast(r)->_clientSend(len); }, this ); + _client->onDisconnect( [](void *r, AsyncClient *c) { reinterpret_cast(r)->_onDisconnect(c); }, this ); + _client->onTimeout( [](void *r, AsyncClient *c, uint32_t time) { (void)c; reinterpret_cast(r)->_onTimeout(time); }, this ); + _client->onData( [](void *r, AsyncClient *c, void *buf, size_t len) { (void)c; reinterpret_cast(r)->_onData(buf, len); }, this ); + _client->onPoll( [](void *r, AsyncClient *c) { reinterpret_cast(r)->_onPoll(c); }, this ); + _client->onError( [](void *r, AsyncClient *c, int8_t error) { (void)c; log_e("err:%d", error); }, this ); + // bind URL hash + setURLHash(request->url().c_str()); + + delete request; +} + +WSocketClient::~WSocketClient() { + if (_client){ + // remove callback here, 'cause _client's destructor will call it again + _client->onDisconnect(nullptr); + delete _client; + _client = nullptr; + } + if (_eventGroup){ + vEventGroupDelete(_eventGroup); + _eventGroup = nullptr; + } +} + +// ***** AsyncTCP callbacks ***** +//#ifdef NOTHING +// callback acknowledges sending pieces of data for outgoing frame +void WSocketClient::_clientSend(size_t acked_bytes){ + if (!_client || _connection == conn_state_t::disconnected) + return; + + /* + this method could be called from different threads - AsyncTCP's ack/poll and user thread when enqueing messages, + only AsyncTCP's ack is mandatory to execute since it carries acked data size, others could be ignored completely + if this call is already exucute in progress. Worse case it will catch up later on next poll + */ + + // create lock object but don't actually take the lock yet + std::unique_lock lock{_sendLock, std::defer_lock}; + + // for response data we need to control AsyncTCP's event queue and in-flight fragmentation. Sending small chunks could give lower latency, + // but flood asynctcp's queue and fragment socket buffer space for large responses. + // Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space. + // That way we could balance on having half the buffer in-flight while another half is filling up and minimizing events in asynctcp's Q + if (acked_bytes){ + //log_d("ack:%u/%u, sock space:%u", acked_bytes, _in_flight, sock_space); + _in_flight -= std::min(acked_bytes, _in_flight); + auto sock_space = _client->space(); + if (!sock_space){ + return; + } + //log_d("infl:%u, credits:%u", _in_flight, _in_flight_credit); + // check if we were waiting to ack our disconnection frame + if (!_in_flight && (_connection == conn_state_t::disconnecting)){ + //log_d("closing tcp-conn"); + // we are server, should close connection first as per https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.1 + // here we close from the app side, send TCP-FIN to the party and move to FIN_WAIT_1/2 states + _client->close(); + return; + } + + // if it's the ack call from AsyncTCP - wait for lock! + lock.lock(); + + } else { + // if there is no acked data - just quit if won't be able to grab a lock, we are already sending something + if (!lock.try_lock()) + return; + + //auto sock_space = _client->space(); + //log_d("no ack infl:%u, space:%u, data pending:%u", _in_flight, sock_space, (uint32_t)(_outFrame.len - _outFrame.index)); + // + if (!_client->space()) + return; + } + + // ignore the call if available sock space is smaller then acked data and we won't be able to fit message's ramainder there + // this will reduce AsyncTCP's event Q pressure under heavy load + if ((_outFrame.msg && (_outFrame.len - _outFrame.index > _client->space())) && (_client->space() < acked_bytes) ){ + //log_d("defer ws send call, in-flight:%u/%u", _in_flight, _client->space()); + return; + } + + // no message in transit and we have enough space in sockbuff - try to evict new msg from a Q + if (!_outFrame.msg && _client->space() > WS_MAX_HEADER_SIZE){ + if (_evictOutQueue()){ + // generate header and add to the socket buffer. todo: check returned size? + _in_flight += webSocketSendHeader(_client, _outFrame); + } else + return; // nothing to send now + } + + // if there is a pending _outFrame - send the data from there + while (_outFrame.msg){ + if (_outFrame.index < _outFrame.len){ + size_t payload_pend = _client->add(_outFrame.msg->getCurrentChunk().first + _outFrame.chunk_offset, _outFrame.msg->getCurrentChunk().second - _outFrame.chunk_offset); + // if no data was added to client's buffer then it's something wrong, we can't go on + if (!payload_pend){ + _client->abort(); + return; + } + _outFrame.index += payload_pend; + _outFrame.chunk_offset += payload_pend; + _in_flight += payload_pend; + //size_t l = _outFrame.len; + //log_d("add to sock:%u, fidx:%u/%u, infl:%u", payload_pend, (uint32_t)_outFrame.index, (uint32_t)_outFrame.len, _in_flight); + } + + if (_outFrame.index == _outFrame.len){ + // if we complete writing entire message, send the frame right away + if (!_client->send()) + _client->abort(); + + if (_outFrame.msg->type == WSFrameType_t::close){ + // if we just sent close frame, then change client state and purge out queue, we won't transmit anything from now on + // the connection will be terminated once all in-flight data are acked from the other side + _connection = conn_state_t::disconnecting; + _outFrame.msg.reset(); + _messageQueueOut.clear(); + return; + } + + // release _outFrame message here + _outFrame.msg.reset(); + + // no use case for this for now + //_sendEvent(event_t::msgSent); + + // if there is still buffer space available try to pull next msg from Q + if (_client->space() > WS_MAX_HEADER_SIZE && _evictOutQueue()){ + // generate header and add to the socket buffer + _in_flight += webSocketSendHeader(_client, _outFrame); + continue; + } else { + return; + } + } + + if (_client->space() <= WS_MAX_HEADER_SIZE){ + // we have exhausted socket buffer, send it and quit + if (!_client->send()) + _client->abort(); + return; + } + + // othewise it was chunked message object and current chunk has been complete + if (_outFrame.chunk_offset == _outFrame.msg->getCurrentChunk().second){ + // request a new one + size_t next_chunk_size = _outFrame.msg->getNextChunk(); + if (next_chunk_size == 0){ + // chunk is not ready yet, need to async wait and return for data later, we quit here and reevaluate on next ack or poll event from AsyncTCP + if (!_client->send()) _client->abort(); + return; + } else if (next_chunk_size == -1){ + // something is wrong! there would be no more chunked data but the message has not reached it's full size yet, can do nothing but close the coonections + log_e("Chunk data is incomlete!"); + _client->abort(); + return; + } + // go on with a loop + } else { + // can't be there? + } + } +} + +bool WSocketClient::_evictOutQueue(){ + // check if we have something in the Q and enough sock space to send a header at least + if (_messageQueueOut.size() && _client->space() > WS_MAX_HEADER_SIZE ){ + { + #ifdef ESP32 + std::unique_lock lockout(_outQlock); + #endif + _outFrame.msg.swap(_messageQueueOut.front()); + _messageQueueOut.pop_front(); + } + _outFrame.chunk_offset = _outFrame.index = 0; + _outFrame.len = _outFrame.msg->getSize(); + return true; + } + + return false; + // this function assumes it's callee will take care of actually sending the header and message body further +} + +void WSocketClient::_onTimeout(uint32_t time) { + if (!_client) { + return; + } + // Serial.println("onTime"); + (void)time; + _client->abort(); +} + +void WSocketClient::_onDisconnect(AsyncClient *c) { + _connection = conn_state_t::disconnected; + //log_d("TCP client disconnected"); + _sendEvent(event_t::disconnect); +} + +void WSocketClient::_onData(void *pbuf, size_t plen) { + //log_d("_onData, 0x%08" PRIx32 " len:%u", (uint32_t)pbuf, plen); + if (!pbuf || !plen || _connection == conn_state_t::disconnected) return; + char *data = (char *)pbuf; + + size_t pb_len = plen; + + while (plen){ + if (!_inFrame.msg){ + // it's a new frame, need to parse header data + size_t framelen; + uint16_t errcode; + std::tie(framelen, errcode) = _mkNewFrame(data, plen, _inFrame); + if (framelen < 2 || errcode){ + // got bad length or close code, initiate disconnect procedure + #ifdef ESP32 + std::unique_lock lockout(_outQlock); + #endif + _messageQueueOut.push_front( std::make_shared(errcode) ); + // send disconnect message now + _clientSend(); + return; + } + // receiving a new frame from here + data += std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size + plen -= std::min(framelen, plen); + } else { + // continuation of existing frame + size_t payload_len = std::min(static_cast(_inFrame.len - _inFrame.index), plen); + // unmask the payload + if (_inFrame.mask) + wsMaskPayload(_inFrame.mask, _inFrame.index, static_cast(data), payload_len); + + // todo: for now assume object will consume all the payload provided + _inFrame.msg->addChunk(data, payload_len, _inFrame.index); + _inFrame.index += payload_len; + data += payload_len; + plen -= payload_len; + } + + // if we got whole frame now + if (_inFrame.index == _inFrame.len){ + //log_d("cmplt msg len:%u", (uint32_t)_inFrame.len); + + if (_inFrame.msg->getStatusCode() == 1007){ + // this is a dummy/corrupted message, we discard it + _inFrame.msg.reset(); + continue; + } + + switch (_inFrame.msg->type){ + // received close message + case WSFrameType_t::close : { + if (_connection == conn_state_t::disconnecting){ + //log_d("recv close ack"); + // if it was ws-close ack - we can close TCP connection + _connection = conn_state_t::disconnected; + // normally we should call close() here and wait for other side also close tcp connection with TCP-FIN, but + // for various reasons ws clients could linger connection when received TCP-FIN not closing it from the app side (even after + // two side ws-close exchange, i.e. websocat, websocket-client) + // This would make server side TCP to stay in FIN_WAIT_2 state quite long time, let's call abort() here instead of close(), + // it is harsh but let other side know that nobody would talk to it any longer and let it reinstate a new connection if needed + // anyway we won't receive/send anything due to '_connection == conn_state_t::disconnected;' + _client->abort(); + _inFrame.msg.reset(); + return; + } + + // otherwise it's a close request from a peer - echo back close message as per https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1 + //log_d("recv client's ws-close req"); + { + #ifdef ESP32 + std::unique_lock lockin(_inQlock); + std::unique_lock lockout(_outQlock); + #endif + // push message to recv Q if it has body, client might use it to understand disconnection reason + if (_inFrame.len > 2) + _messageQueueIn.push_back(_inFrame.msg); + // purge the out Q and echo recieved frame back to client, once it's tcp-acked from the other side we can close tcp connection + _messageQueueOut.clear(); + _messageQueueOut.push_front(_inFrame.msg); + } + _inFrame.msg.reset(); + // send event only when message has body + if (_inFrame.len > 2) + _sendEvent(event_t::msgRecv); + break; + } + + case WSFrameType_t::ping : { + #ifdef ESP32 + std::unique_lock lock(_outQlock); + #endif + // just reply to ping, does user needs this ping message? + _messageQueueOut.emplace_front( std::make_shared>(WSFrameType_t::pong, true, _inFrame.msg->getData()) ); + _inFrame.msg.reset(); + // send frame is no other message is in progress + if (!_outFrame.msg) + _clientSend(); + break; + } + + default: { + // any other messages + { + #ifdef ESP32 + std::lock_guard lock(_inQlock); + #endif + _messageQueueIn.push_back(_inFrame.msg); + } + _inFrame.msg.reset(); + _sendEvent(event_t::msgRecv); + break; + } + } + } + } + + /* + Applying TCP window control here. In case if there are pending messages in the Q + we clamp window size gradually to push sending party back. The larger the Q grows + then more window is closed. This works pretty well for messages sized about or more + than TCP windows size (5,7k default for Arduino). It could prevent Q overflow and + sieze incoming data flow without blocking the entie network stack. Mostly usefull + with websocket worker where AsyncTCP thread is not blocked by user callbacks. + */ + if (_messageQueueIn.size()){ + _client->ackLater(); + size_t reduce_size = pb_len * _messageQueueIn.size() / _max_qcap; + _client->ack(pb_len - reduce_size); + _pending_ack += reduce_size; + //log_d("delay ack:%u, total pending:%u", reduce_size, _pending_ack); + } +} + +void WSocketClient::_onPoll(AsyncClient *c){ + /* + Window control - we open window deproportionally to Q size letting data flow a bit + */ + if (_pending_ack){ + size_t to_keep = _pending_ack * (_messageQueueIn.size() + 1) / _max_qcap; + _client->ack(_pending_ack - to_keep); + size_t bak = _pending_ack; + _pending_ack = to_keep; + //log_d("poll ack:%u, left:%u\n", bak - _pending_ack, _pending_ack); + } + _keepalive(); + // call send if no other message is in progress and Q is not empty somehow, + // otherwise rely on ack events + if (!_outFrame.msg && _messageQueueOut.size()) + _clientSend(); +} + +std::pair WSocketClient::_mkNewFrame(char* data, size_t len, WSMessageFrame& frame){ + if (len < 2) return {0, 1002}; // return protocol error + uint8_t opcode = data[0] & 0x0F; + bool final = data[0] & 0x80; + bool masked = data[1] & 0x80; + + // read frame size + frame.len = data[1] & 0x7F; + size_t offset = 2; // first 2 bytes +/* + Serial.print("ws hdr: "); + //Serial.println(frame.mask, HEX); + char buffer[10] = {}; // Buffer for hex conversion + char* ptr = data; + for (size_t i = 0; i != 10; ++i ) { + sprintf(buffer, "%02X", *ptr); // Convert to uppercase hex + Serial.print(buffer); + //Serial.print(" "); + ++ptr; + } + Serial.println(); +*/ + // find message size from header + if (frame.len == 126 && len >= 4) { + // two byte + frame.len = (data[2] << 8) | data[3]; + offset += 2; + } else if (frame.len == 127 && len >= 10) { + // four byte + frame.len = ntohl(*(uint32_t*)(data + 2)); // MSB + frame.len <<= 32; + frame.len |= ntohl(*(uint32_t*)(data + 6)); // LSB + offset += 8; + } + + //log_d("recv hdr, sock data:%u, msg body size:%u", len, frame.len); + + // if ws.close() is called, Safari sends a close frame with plen 2 and masked bit set. We must not try to read mask key from beyond packet size + if (masked && len >= offset + 4) { + // mask bytes order are LSB, so we can copy it as-is + frame.mask = *reinterpret_cast(data + offset); + //Serial.printf("mask key at %u, :0x", offset); + //Serial.println(frame.mask, HEX); + offset += 4; + } + + frame.index = frame.chunk_offset = 0; + + size_t bodylen = std::min(static_cast(frame.len), len - offset); + if (!bodylen){ + // if there is no body in message, then it must be a specific control message with no payload + _inFrame.msg = std::make_shared(static_cast(opcode)); + } else { + if (frame.len > _max_msgsize){ + // message is bigger than we are allowed to accept, create a dummy container for it, it will just discard all incoming data + _inFrame.msg = std::make_shared(static_cast(opcode), 1007); // code 'Invalid frame payload data' + offset += bodylen; + _inFrame.index = bodylen; + return {offset, 1009}; // code 'message too big' + } + + // check in-queue for overflow + if (_messageQueueIn.size() >= _max_qcap){ + log_w("q overflow, client id:%u, qsize:%u", id, _messageQueueIn.size()); + switch (_overflow_policy){ + case overflow_t::discard : + // silently discard incoming message + _inFrame.msg = std::make_shared(static_cast(opcode), 1007); // code 'Invalid frame payload data' + offset += bodylen; + _inFrame.index = bodylen; + return {offset, 0}; + + case overflow_t::disconnect : { + // discard incoming message and send close message + _inFrame.msg = std::make_shared(static_cast(opcode), 1007); // code 'Invalid frame payload data' + #ifdef ESP32 + std::lock_guard lock(_inQlock); + #endif + _messageQueueOut.push_front( std::make_shared(1011, "Server Q overflow") ); + offset += bodylen; + _inFrame.index = bodylen; + return {offset, 0}; + } + + case overflow_t::drophead : + _messageQueueIn.pop_front(); + break; + case overflow_t::droptail : + _messageQueueIn.pop_back(); + break; + + default:; + } + + _sendEvent(event_t::msgDropped); + } + + // let's unmask payload right in sock buff, later it could be consumed as raw data + if (masked){ + wsMaskPayload(_inFrame.mask, 0, data + offset, bodylen); + } + + // create a new container object that fits best for message type + switch (static_cast(opcode)){ + case WSFrameType_t::text : + // create a text message container consuming as much data as possible from current payload + _inFrame.msg = std::make_shared>(WSFrameType_t::text, final, (char*)(data + offset), bodylen); + break; + + case WSFrameType_t::close : { + uint16_t status_code = ntohs(*(uint16_t*)(data + offset)); + if (bodylen > 2){ + // create a text message container consuming as much data as possible from current payload + _inFrame.msg = std::make_shared(status_code, data + offset + 2, bodylen -2); // deduce 2 bytes of message code + } else { + // must be close message w/o body + _inFrame.msg = std::make_shared(status_code); + } + break; + } + + default: + _inFrame.msg = std::make_shared>>(static_cast(opcode), bodylen); + // copy as much data as it is available in current sock buff + // todo: for now assume object will consume all the payload provided + _inFrame.msg->addChunk(data + offset, bodylen, 0); + + } + offset += bodylen; + _inFrame.index = bodylen; + } + + //log_e("new msg frame size:%u, bodylen:%u", offset, bodylen); + // return the number of consumed data from input buffer + return {offset, 0}; +} + +WSocketClient::err_t WSocketClient::enqueueMessage(WSMessagePtr mptr){ + if (_connection != conn_state_t::connected) + return err_t::disconnected; + + if (_messageQueueOut.size() < _max_qcap){ + { + #ifdef ESP32 + std::lock_guard lock(_outQlock); + #endif + _messageQueueOut.emplace_back( std::move(mptr) ); + } + // send frame if no other message is in progress + if (!_outFrame.msg) + _clientSend(); + return err_t::ok; + } + + return err_t::outQfull; +} + +WSMessagePtr WSocketClient::dequeueMessage(){ + WSMessagePtr msg; + if (_messageQueueIn.size()){ + #ifdef ESP32 + std::unique_lock lock(_inQlock); + #endif + msg.swap(_messageQueueIn.front()); + _messageQueueIn.pop_front(); + } + /* + Window control - we open window deproportionally to Q size letting data flow once a message is deQ'd + */ + if (_pending_ack){ + if (!_messageQueueIn.size()){ + _client->ack(0xffff); // on empty Q we ack whatever is left (max TCP win size) + } else { + size_t ackpart =_pending_ack * (_max_qcap - _messageQueueIn.size()) / _max_qcap; + //log_d("ackdq:%u/%u", ackpart, _pending_ack); + _client->ack(ackpart); + _pending_ack -= ackpart; + } + } + return msg; +} + +WSMessagePtr WSocketClient::peekMessage(){ + return _messageQueueIn.size() ? _messageQueueIn.front() : WSMessagePtr(); +} + +WSocketClient::err_t WSocketClient::state() const { + if (_connection != conn_state_t::connected) return err_t::disconnected; + if (_messageQueueOut.size() >= _max_qcap && _messageQueueIn.size() >= _max_qcap ) return err_t::Qsfull; + if (_messageQueueIn.size() >= _max_qcap) return err_t::inQfull; + if (_messageQueueOut.size() >= _max_qcap) return err_t::outQfull; + return err_t::ok; +} + +WSocketClient::err_t WSocketClient::close(uint16_t code, const char *message){ + if (_connection != conn_state_t::connected) + return err_t::disconnected; + + #ifdef ESP32 + std::lock_guard lock(_outQlock); + #endif + if (message) + _messageQueueOut.emplace_front( std::make_shared(code, message) ); + else + _messageQueueOut.emplace_front( std::make_shared(code) ); + // send frame if no other message is in progress + if (!_outFrame.msg) + _clientSend(); + return err_t::ok; +} + +void WSocketClient::_sendEvent(event_t e){ + if (_eventGroup) + xEventGroupSetBits(_eventGroup, enum2uint32(e)); + if (_cb) + _cb(this, e); +} + +void WSocketClient::_keepalive(){ + if (_keepAlivePeriod && (millis() - _lastPong > _keepAlivePeriod)){ + enqueueMessage(std::make_shared< WSMessageContainer >(WSFrameType_t::pong, true, "WSocketClient Pong" )); + _lastPong = millis(); + } +} + + +// ***** WSocketServer implementation ***** + +bool WSocketServer::newClient(AsyncWebServerRequest *request){ + // remove expired clients first + _purgeClients(); + { + #ifdef ESP32 + std::lock_guard lock(clientslock); + #endif + _clients.emplace_back(getNextId(), request, + [this](WSocketClient *c, WSocketClient::event_t e){ + // server echo call + if (e == WSocketClient::event_t::msgRecv) serverEcho(c); + if (eventHandler) + eventHandler(c, e); + else + c->dequeueMessage(); }, // silently discard incoming messages when there is no callback set + msgsize, qcap); + _clients.back().setOverflowPolicy(_overflow_policy); + _clients.back().setKeepAlive(_keepAlivePeriod); + } + if (eventHandler) + eventHandler(&_clients.back(), WSocketClient::event_t::connect); + return true; +} + +void WSocketServer::handleRequest(AsyncWebServerRequest *request) { + if (!request->hasHeader(WS_STR_VERSION) || !request->hasHeader(WS_STR_KEY)) { + request->send(400); + return; + } + + if (_handshakeHandler != nullptr) { + if (!_handshakeHandler(request)) { + request->send(401); + return; + } + } + const AsyncWebHeader *version = request->getHeader(WS_STR_VERSION); + if (version->value().compareTo(asyncsrv::T_13) != 0) { + AsyncWebServerResponse *response = request->beginResponse(400); + response->addHeader(WS_STR_VERSION, asyncsrv::T_13); + request->send(response); + return; + } + const AsyncWebHeader *key = request->getHeader(WS_STR_KEY); + AsyncWebServerResponse *response = new AsyncWebSocketResponse(key->value(), [this](AsyncWebServerRequest *r){ return newClient(r); }); + if (response == NULL) { + request->abort(); + return; + } + if (request->hasHeader(WS_STR_PROTOCOL)) { + const AsyncWebHeader *protocol = request->getHeader(WS_STR_PROTOCOL); + // ToDo: check protocol + response->addHeader(WS_STR_PROTOCOL, protocol->value()); + } + + request->send(response); +} + +bool WSocketServer::canHandle(AsyncWebServerRequest *request) const { + if (request->isWebSocketUpgrade()){ + auto url = request->url().c_str(); + auto i = std::find_if(_urlhashes.cbegin(), _urlhashes.cend(), [url](auto const &h){ return h == asyncsrv::hash_djb2a(url); }); + return (i != _urlhashes.cend()); + } + return false; +}; + +WSocketClient* WSocketServer::getClient(uint32_t id) { + auto iter = std::find_if(_clients.begin(), _clients.end(), [id](const WSocketClient &c) { return c.id == id; }); + if (iter != std::end(_clients)) + return &(*iter); + else + return nullptr; +} + +WSocketClient const* WSocketServer::getClient(uint32_t id) const { + const auto iter = std::find_if(_clients.cbegin(), _clients.cend(), [id](const WSocketClient &c) { return c.id == id; }); + if (iter != std::cend(_clients)) + return &(*iter); + else + return nullptr; +} + +WSocketClient::err_t WSocketServer::clientState(uint32_t id) const { + if (auto c = getClient(id)) + return c->state(); + else + return WSocketClient::err_t::disconnected; +}; + +WSocketServer::msgall_err_t WSocketServer::clientsState() const { + size_t cnt = std::count_if(std::cbegin(_clients), std::cend(_clients), [](const WSocketClient &c) { return c.state() == WSocketClient::err_t::ok; }); + if (!cnt) return msgall_err_t::none; + return cnt == _clients.size() ? msgall_err_t::ok : msgall_err_t::partial; +} + +WSocketServer::msgall_err_t WSocketServer::pingAll(const char *data, size_t len){ + size_t cnt{0}; + for (auto &c : _clients) { + if ( c.ping(data, len) == WSocketClient::err_t::ok) + ++cnt; + } + if (!cnt) + return msgall_err_t::none; + return cnt == _clients.size() ? msgall_err_t::ok : msgall_err_t::partial; +} + +WSocketClient::err_t WSocketServer::message(uint32_t clientid, WSMessagePtr m){ +if (WSocketClient *c = getClient(clientid)) + return c->enqueueMessage(std::move(m)); +else + return WSocketClient::err_t::disconnected; +} + +WSocketServer::msgall_err_t WSocketServer::messageAll(WSMessagePtr m){ + size_t cnt{0}; + for (auto &c : _clients) { + if ( c.enqueueMessage(m) == WSocketClient::err_t::ok) + ++cnt; + } + if (!cnt) + return msgall_err_t::none; + return cnt == _clients.size() ? msgall_err_t::ok : msgall_err_t::partial; +} + +WSocketServer::msgall_err_t WSocketServer::messageToEndpoint(uint32_t hash, WSMessagePtr m){ + size_t cnt{0}, cntt{0}; + for (auto &c : _clients){ + if (c.getURLHash() == hash){ + ++cntt; + if ( c.enqueueMessage(m) == WSocketClient::err_t::ok) + ++cnt; + } + } + if (!cnt) + return msgall_err_t::none; + return cnt == cntt ? msgall_err_t::ok : msgall_err_t::partial; +} + +void WSocketServer::_purgeClients(){ + std::lock_guard lock(clientslock); + // purge clients that are disconnected and with all messages consumed + _clients.remove_if([](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); }); +} + +size_t WSocketServer::activeClientsCount() const { + return std::count_if(std::begin(_clients), std::end(_clients), + [](const WSocketClient &c) { return c.connection() == WSocketClient::conn_state_t::connected; } + ); +} + +size_t WSocketServer::activeEndpointClientsCount(uint32_t hash) const { + return std::count_if(std::begin(_clients), std::end(_clients), + [hash](const WSocketClient &c) { return c.connection() == WSocketClient::conn_state_t::connected && c.getURLHash() == hash; } + ); +} + +void WSocketServer::serverEcho(WSocketClient *c){ + if (!_serverEcho) return; + auto m = c->peekMessage(); + if (m && (m->type == WSFrameType_t::text || m->type == WSFrameType_t::binary) ){ + // echo only text or bin messages + for (auto &i: _clients){ + if (!_serverEchoSplitHorizon || i.id != c->id){ + i.enqueueMessage(m); + } + } + } +} + +void WSocketServer::removeURLendpoint(std::string_view url){ + _urlhashes.erase(remove_if(_urlhashes.begin(), _urlhashes.end(), [url](auto const &v){ return v == asyncsrv::hash_djb2a(url); }), _urlhashes.end()); +} + + +// ***** WSMessageClose implementation ***** + +WSMessageClose::WSMessageClose (uint16_t status) : WSMessageContainer(WSFrameType_t::close, true), _status_code(status) { + // convert code to message body + uint16_t buff = htons (status); + container.append((char*)(&buff), 2); +}; + + +// ***** WSocketServerWorker implementation ***** + +bool WSocketServerWorker::newClient(AsyncWebServerRequest *request){ + { + #ifdef ESP32 + std::lock_guard lock (clientslock); + #endif + _clients.emplace_back(getNextId(), request, + [this](WSocketClient *c, WSocketClient::event_t e){ + //log_d("client event id:%u state:%u", c->id, c->state()); + // server echo call + if (e == WSocketClient::event_t::msgRecv) serverEcho(c); + if (_task_hndlr) xTaskNotifyGive(_task_hndlr); + }, + msgsize, qcap); + + // create events group where we'll pick events + _clients.back().createEventGroupHandle(); + _clients.back().setOverflowPolicy(getOverflowPolicy()); + _clients.back().setKeepAlive(_keepAlivePeriod); + _clients.back().setURLHash(request->url().c_str()); + xEventGroupSetBits(_clients.back().getEventGroupHandle(), enum2uint32(WSocketClient::event_t::connect)); + } + if (_task_hndlr) + xTaskNotifyGive(_task_hndlr); + return true; +} + +void WSocketServerWorker::start(uint32_t stack, UBaseType_t uxPriority, BaseType_t xCoreID){ + if (_task_hndlr) return; // we are already running + + xTaskCreatePinnedToCore( + [](void* pvParams){ static_cast(pvParams)->_taskRunner(); }, + WS_SRV_TASK, + stack, + (void *)this, + uxPriority, + &_task_hndlr, + xCoreID ); // == pdPASS; +} + +void WSocketServerWorker::stop(){ + if (_task_hndlr){ + vTaskDelete(_task_hndlr); + _task_hndlr = nullptr; + } +} + +void WSocketServerWorker::_taskRunner(){ + for (;;){ + + // go through our client's list looking for pending events, do not care to lock the list here, + // 'cause nobody would be able to remove anything from it but this loop and adding new client won't invalidate current iterator + auto it = _clients.begin(); + while (it != _clients.end()){ + EventBits_t uxBits; + + // check if this a new client + uxBits = xEventGroupClearBits(it->getEventGroupHandle(), enum2uint32(WSocketClient::event_t::connect) ); + if ( uxBits & enum2uint32(WSocketClient::event_t::connect) ){ + _ecb(&(*it), WSocketClient::event_t::connect); + } + + // check if 'inbound Q full' flag set + uxBits = xEventGroupClearBits(it->getEventGroupHandle(), enum2uint32(WSocketClient::event_t::inQfull) ); + if ( uxBits & enum2uint32(WSocketClient::event_t::inQfull) ){ + _ecb(&(*it), WSocketClient::event_t::inQfull); + } + + // check for dropped messages flag + uxBits = xEventGroupClearBits(it->getEventGroupHandle(), enum2uint32(WSocketClient::event_t::msgDropped) ); + if ( uxBits & enum2uint32(WSocketClient::event_t::msgDropped) ){ + _ecb(&(*it), WSocketClient::event_t::msgDropped); + } + + // process all the messages from inbound Q + xEventGroupClearBits(it->getEventGroupHandle(), enum2uint32(WSocketClient::event_t::msgRecv) ); + while (auto m{it->dequeueMessage()}){ + _mcb(m, it->id); + } + + // check for disconnected client - do not care for group bits, cause if it's deleted, we will destruct the client object + if (it->connection() == WSocketClient::conn_state_t::disconnected){ + // run a callback + _ecb(&(*it), WSocketClient::event_t::disconnect); + { + #ifdef ESP32 + std::lock_guard lock (clientslock); + #endif + it = _clients.erase(it); + } + } else { + // advance iterator + ++it; + } + } + + // wait for next event here, using counted notification we could do some dry-runs but won't miss any events + ulTaskNotifyTake(pdFALSE, portMAX_DELAY); + + // end of a task loop + } + vTaskDelete(NULL); +} + +#endif // (ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)) +#endif // defined(ESP32) +#endif // __cplusplus >= 201703L diff --git a/src/AsyncWSocket.h b/src/AsyncWSocket.h new file mode 100644 index 000000000..d45fe30a1 --- /dev/null +++ b/src/AsyncWSocket.h @@ -0,0 +1,1102 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +// A new experimental implementation of Async WebSockets client/server + +#pragma once +#if __cplusplus >= 201703L +#ifndef ESP32 +#warning "WSocket is now supported on ESP32 only" +#else + + +#include "AsyncWebSocket.h" +#include "freertos/FreeRTOS.h" + +namespace asyncsrv { +// literals hashing +// https://learnmoderncpp.com/2020/06/01/strings-as-switch-case-labels/ + +inline constexpr auto hash_djb2a(const std::string_view sv) { + uint32_t hash{ 5381 }; + for (unsigned char c : sv) { + hash = ((hash << 5) + hash) ^ c; + } + return hash; +} + +} + +// forward declaration for WSocketServer +class WSocketServer; + +/** + * @brief WebSocket message type + * + */ +enum class WSFrameType_t : uint8_t { // type field is 4 bits + continuation = 0, // fragment of a previous message + text, + binary, + close = 0x08, + ping, + pong +}; + +/** + * @brief Message transition state + * reflects message transmission state + * + */ +enum class WSMessageStatus_t { + empty = 0, // bare message, not usefull for anything yet + sending, // message in sending state + sent, // message transition has been complete + incomlete, // message is not complete. i.e. partially received + complete, // message is complete, no new data is expected + error // malformed message +}; + + + +/** + * @brief abstract WebSocket message + * + */ +class WSMessageGeneric { + friend class WSocketClient; + +public: + const WSFrameType_t type; + + WSMessageGeneric(WSFrameType_t type, bool final = true) : type(type), _final(final) {}; + virtual ~WSMessageGeneric(){}; + + // if this message in final fragment + bool final() const { return _final; } + + /** + * @brief Get the Size of the message + * + * @return size_t + */ + virtual size_t getSize() const = 0; + + /** + * @brief Get access to RO-data + * @note we cast it to char* 'cause of two reasons: + * - the LWIP's _tcp_write() function accepts const char* + * - websocket is mostly text anyway unless compressed + * @note for messages with type 'text' this lib will ALWAYS include NULL terminator at the end of data, NULL byte is NOT included in total size of the message returned by getSize() + * a care should be taken when accessing the data for messages with type 'binary', it won't have NUL terminator at the end and would be valid up to getSize() in length! + * @note buffer should be writtable so that client class could apply masking on data if needed + * @return char* const + */ + virtual const char* getData() = 0; + + /** + * @brief Get mutable access to underlying Buffer memory + * container might not allow this and return nullptr in this case + * + * @return char* + */ + virtual char* getBuffer(){ return nullptr; } + + /** + * @brief WebSocket message status code as defined in https://datatracker.ietf.org/doc/html/rfc6455#section-7.4 + * it could be used as integrity control or derived class features + * + * @return uint16_t 0 - default code for correct message + */ + virtual uint16_t getStatusCode() const { return 0; } + +protected: + /** Is this the last chunk in a fragmented message ?*/ + bool _final; + + /** + * @brief access message buffer in chunks + * this method is used internally by WSocketClient class when sending message. In simple case it just wraps around getDtataConst() call + * but could also be implemented in derived classes for chunked transfers + * + * @return std::pair + */ + virtual std::pair getCurrentChunk(){ return std::pair(getData(), getSize()); } + + /** + * @brief move to the next chunk of message data assuming current chunk has been consumed already + * @note nextChunk call switches the pointer, getCurrentChunk() would return new chunk afterwards + * + * @return int32_t - size of a next chunk of data + * @note if returned size is '-1' then no more data chunks are available + * @note if returned size is '0' then next chunk is not available yet, a call should be repeated later to retreive new chunk + */ + virtual int32_t getNextChunk(){ return -1; }; + + /** + * @brief add a chunk of data to the message + * it will NOT extend total message size, it's to reassemble a message taking several TCP packets + * + * @param data - data chunk pointer + * @param len - size of a chunk + * @param offset - an offset for chunk from begingn of message (@note offset is for calculation reference only, the chunk is always added to the end of current message data) + */ + virtual void addChunk(char* data, size_t len, size_t offset) = 0; + +}; + +using WSMessagePtr = std::shared_ptr; + +/** + * @brief templated Message container + * it can use any implementation of container classes to hold message data + * two main types to use are std::string for text messages and std::vector for binary + * @note Arduino's String class has limited functionality on accessing underlying buffer + * and resizing and should be avoided (but still possible) + * + * @tparam T + */ +template +class WSMessageContainer : public WSMessageGeneric { +protected: + T container; + +public: + + WSMessageContainer(WSFrameType_t t, bool final = true) : WSMessageGeneric(t, final) {} + + // variadic constructor for anything that T can be made of + template + WSMessageContainer (WSFrameType_t t, bool final, Args&&... args) : WSMessageGeneric(t, final), container(std::forward(args)...) {}; + + /** + * @copydoc WSMessageGeneric::getSize() + */ + size_t getSize() const override { + // specialisation for Arduino String + if constexpr(std::is_same_v>) + return container.length(); + // otherwise we assume either STL container is used, (i.e. std::vector or std::string) or derived class should implement same methods + if constexpr(!std::is_same_v>) + return container.size(); + }; + + /** + * @copydoc WSMessageGeneric::getData() + * @details though casted to const char* the data there MIGHT not be NULL-terminated string (depending on underlying container) + */ + const char* getData() override { + // specialization for Arduino String + if constexpr(std::is_same_v>) + return container.c_str(); + // otherwise we assume either STL container is used, (i.e. st::vector or std::string) or derived class should implement same methods + if constexpr(!std::is_same_v>) + return reinterpret_cast(container.data()); + } + + /** + * @copydoc WSMessageGeneric::getBuffer() + * @details though casted to const char* the data there MIGHT not be NULL-terminated string (depending on underlying container) + */ + char* getBuffer() override { + // specialization for Arduino String - it does not allow accessing underlying buffer, so return nullptr here + if constexpr(std::is_same_v>) + return nullptr; + // otherwise we assume either STL container is used, (i.e. st::vector or std::string) or derived class should implement same methods + if constexpr(!std::is_same_v>) + return reinterpret_cast(container.data()); + } + + // access message container object + T& getContainer(){ return container; } + +protected: + + /** + * @copydoc WSMessageGeneric::addChunk(char* data, size_t len, size_t offset) + * @details though casted to const char* the data there is NOT NULL-terminated string! + */ + void addChunk(char* data, size_t len, size_t offset) override { + // specialization for Arduino String + if constexpr(std::is_same_v>){ + container.concat(data, len); + } + + // specialization for std::string + if constexpr(std::is_same_v>){ + container.append(data, len); + } + + // specialization for std::vector + if constexpr(std::is_same_v, std::decay_t>){ + container.resize(len + offset); + memcpy(container.data(), data, len); + } + } + +}; + +/** + * @brief Control message - 'Close' + * + */ +class WSMessageClose : public WSMessageContainer { + const uint16_t _status_code; + +public: + /** + * @brief Construct Close message without the body + * + * @param status close code as defined in https://datatracker.ietf.org/doc/html/rfc6455#section-7.4 + */ + WSMessageClose (uint16_t status = 1000); + // variadic constructor for anything that std::string can be made of + template + WSMessageClose (uint16_t status, Args&&... args) : WSMessageContainer(WSFrameType_t::close, true, std::forward(args)...), _status_code(status) { + // convert code to message body + uint16_t buff = htons (status); + container.insert(0, (char*)(&buff), 2); + }; + + uint16_t getStatusCode() const override { return _status_code; } +}; + +/** + * @brief Dummy message that does not carry any data + * could be used as a container for bodyless control messages or + * specific cases (to gracefully handle oversised incoming messages) + */ +class WSMessageDummy : public WSMessageGeneric { + const uint16_t _code; +public: + explicit WSMessageDummy(WSFrameType_t type, uint16_t status_code = 0) : WSMessageGeneric(type, true), _code(status_code) {}; + size_t getSize() const override { return 0; }; + const char* getData() override { return nullptr; }; + uint16_t getStatusCode() const override { return _code; } + +protected: + void addChunk(char* data, size_t len, size_t offset) override {}; +}; + +/** + * @brief A message that carries a pointer to arbitrary blob of data + * it could be used to send large blocks of memory in zero-copy mode, + * i.e. avoid intermediary buffering copies. + * The concern here is that pointer MUST persist for the duration of message + * transfer period. Due to async nature it' it unknown how much time it would + * take to complete the transfer. For this a callback function is provided + * that triggers on object's destruction. It allows to get the event on + * transfer completetion and (possibly) release the ponter or do something else + * + */ +class WSMessageStaticBlob : public WSMessageGeneric { +public: + + // callback prototype that is triggered on message destruction + using event_cb_t = std::function; + + /** + * @brief Construct a new WSMessageStaticBlob object + * + * @param type WebSocket message type + * @param final WS final bit + * @param blob a pointer to blob object (must be casted to const char*) + * @param size a size of the object + * @param cb a callback function to be call when message complete sending/errored + * @param token a unique token to identify packet instance in callback + */ + explicit WSMessageStaticBlob(WSFrameType_t type, bool final, const char* blob, size_t size, event_cb_t cb = {}, uint32_t token = 0) + : WSMessageGeneric(type, final), _blob(blob), _size(size), _callback(cb), _token(token) {}; + + // d-tor, we call the callback here + ~WSMessageStaticBlob(){ if (_callback) _callback(WSMessageStatus_t::complete, _token); } + + size_t getSize() const override { return _size; }; + const char* getData() override { return _blob; }; + +private: + const char* _blob; + size_t _size; + event_cb_t _callback; + // unique token identifying the packet + uint32_t _token; + // it is not allowed to store anything here + void addChunk(char* data, size_t len, size_t offset) override final {}; + +}; + +/** + * @brief structure that owns the message (or fragment) while sending/receiving by WSocketClient + */ +struct WSMessageFrame { + /** Mask key */ + uint32_t mask; + /** Length of the current message/fragment to be transmitted. This equals the total length of the message if num == 0 && final == true */ + uint64_t len; + /** Offset of the payload data pending to be sent. Note: this is NOT websocket message fragment's size! */ + uint64_t index; + /** offset in the current chunk of data, used to when sending message in chunks (not WS fragments!), for single chunged messages chunk_offset and index are same */ + size_t chunk_offset; + // message object + WSMessagePtr msg; +}; + +/** + * @brief WebSocket client instance + * + */ +class WSocketClient { +public: + // TCP connection state + enum class conn_state_t { + connected, // connected and exchangin messages + disconnecting, // awaiting close ack + disconnected // ws peer is disconnected + }; + + /** + * @brief WebSocket Client Events + * + */ + enum class event_t { + connect = 0x01, + disconnect = 0x02, + msgRecv = 0x04, + msgSent = 0x08, + msgDropped = 0x10, + inQfull = 0x20 + }; + + // error codes + enum class err_t { + ok, // all correct + inQfull, // inbound Q is full, won't receive new messages + outQfull, // outboud Q is full, won't send new messages + Qsfull, // both Qs are full + disconnected, // peer connection is broken + na // client is not available + }; + + enum class overflow_t { + disconnect, + discard, + drophead, + droptail + }; + + // event callback alias + using event_cb_t = std::function; + + // Client connection ID (increments for each new connection for the given server) + const uint32_t id; + +private: + AsyncClient *_client; + event_cb_t _cb; + // incoming message size limit + size_t _max_msgsize; + // cummulative maximum of the data messages held in message queues, both in and out + size_t _max_qcap; + // hashed url bound to client's request + uint32_t _urlhash; + +public: + + /** + * @brief Construct a new WSocketClient object + * + * @param id - client's id tag + * @param request - AsyncWebServerRequest which is switching the protocol to WS + * @param call_back - event callback handler + * @param msgcap - incoming message size limit, if incoming msg advertizes larger size the connection would be dropped + * @param qcap - in/out queues sizes (in number of messages) + */ + WSocketClient(uint32_t id, AsyncWebServerRequest *request, WSocketClient::event_cb_t call_back, size_t msgsize = 8 * 1024, size_t qcap = 4); + ~WSocketClient(); + + /** + * @brief Enqueue message for sending + * + * @param msg rvalue reference to message object, i.e. WSocketClient will take the ownership of the object + * @return err_t enqueue error status + */ + err_t enqueueMessage(std::shared_ptr mptr); + + /** + * @brief retrieve message from inbout Q + * + * @return WSMessagePtr + * @note if Q is empty, then empty pointer is returned, so it should be validated + */ + WSMessagePtr dequeueMessage(); + + /** + * @brief access first avaiable message from inbound queue + * @note this call will NOT remove message from queue but access message in-place! + * + * @return WSMessagePtr if Q is empty, then empty pointer is returned + */ + WSMessagePtr peekMessage(); + + /** + * @brief return peer connection state + * + * @return conn_state_t + */ + conn_state_t connection() const { + return _connection; + } + + /** + * @brief get client's state + * returns error status if client is available to send/receive data + * + * @return err_t + */ + err_t state() const; + + AsyncClient *client() { + return _client; + } + + const AsyncClient *client() const { + return _client; + } + + /** + * @brief bind URL hash to client instance + * a hash could be used to differentiate clients attached via various URLs + * + * @param url + */ + void setURLHash(std::string_view url) { _urlhash = asyncsrv::hash_djb2a(url); } + + /** + * @brief get URL hash bound to client + * + * @return uint32_t + */ + uint32_t getURLHash() const { return _urlhash; } + + /** + * @brief check if client's bound URL matches string + * + * @param url + * @return true + * @return false + */ + bool matchURL(std::string_view url) { return _urlhash == asyncsrv::hash_djb2a(url); } + + /** + * @brief Set inbound queue overflow Policy + * + * @param policy inbound queue overflow policy + * + * @note overflow_t::disconnect (default) - disconnect client with respective close message when inbound Q is full. + * This is the default behavior in yubox-node-org, which is not silently discarding messages but instead closes the connection. + * The big issue with this behavior is that is can cause the UI to automatically re-create a new WS connection, which can be filled again, + * and so on, causing a resource exhaustion. + * + * @note overflow_t::discard - silently discard new messages if the queue is full (only a discard event would be generated to notify about drop) + * This is the default behavior in the original ESPAsyncWebServer library from me-no-dev. This behavior allows the best performance at the expense of unreliable message delivery in case the queue is full. + * + * @note overflow_t::drophead - drop the oldest message from inbound queue to fit new message + * @note overflow_t::droptail - drop most recent message from inbound queue to fit new message + * + */ + void setOverflowPolicy(overflow_t policy){ _overflow_policy = policy; } + + overflow_t getOverflowPolicy() const { return _overflow_policy; } + + // send control frames + err_t close(uint16_t code = 0, const char *message = NULL); + err_t ping(const char *data = NULL, size_t len = 0); + + size_t inQueueSize() const { return _messageQueueIn.size(); }; + size_t outQueueSize() const { return _messageQueueOut.size(); }; + + /** + * @brief Set the WebSOcket ping Keep A Live + * if set, client will send pong packet it's peer periodically to keep the connection alive + * ping does not require a reply from peer + * + * @param seconds + */ + void setKeepAlive(size_t seconds){ _keepAlivePeriod = seconds * 1000; }; + + // get keepalive value, seconds + size_t getKeepAlive() const { return _keepAlivePeriod / 1000; }; + + /** + * @brief access Event Group Handle for the client + * + * @return EventGroupHandle_t + */ + EventGroupHandle_t getEventGroupHandle(){ return _eventGroup; }; + + /** + * @brief Create a Event Group for the client + * if Event Group is created then client will set event bits in the group + * when various events are generate, i.e. message received, connect/disconnect, etc... + * + * @return EventGroupHandle_t + */ + EventGroupHandle_t createEventGroupHandle(){ + if (!_eventGroup) _eventGroup = xEventGroupCreate(); + return _eventGroup; + } + +private: + conn_state_t _connection{conn_state_t::connected}; + // frames in transit + WSMessageFrame _inFrame{}, _outFrame{}; + // message queues + std::deque< WSMessagePtr > _messageQueueIn; + std::deque< WSMessagePtr > _messageQueueOut; + EventGroupHandle_t _eventGroup{nullptr}; + +#ifdef ESP32 + // access mutex'es + std::mutex _sendLock; + mutable std::recursive_mutex _inQlock; + mutable std::recursive_mutex _outQlock; +#endif + + // inbound Q overflow behavior + overflow_t _overflow_policy{overflow_t::disconnect}; + + // amount of sent data in-flight, i.e. copied to socket buffer, but not acked yet from lwip side + size_t _in_flight{0}; + // counter for consumed data from tcp_pcbs, but delayd ack to hold the window + size_t _pending_ack{0}; + + // keepalive + unsigned long _keepAlivePeriod{0}, _lastPong; + + /** + * @brief go through out Q and send message data () + * @note this method will grab a mutex lock on outQ internally + * + */ + void _clientSend(size_t acked_bytes = 0); + + /** + * @brief expell next message from out Q (if any) and start sending it to the peer + * @note this function will generate and add msg header to socket buffer but assumes it's callee will take care of actually sending the header and message body further + * @note this function assumes that it's calle has already set _outFrameLock + * + */ + bool _evictOutQueue(); + + /** + * @brief try to parse ws header and create a new frame message + * + * @return size_t size of the parsed and consumed bytes from input in a new message + */ + std::pair _mkNewFrame(char* data, size_t len, WSMessageFrame& frame); + + /** + * @brief run a callback for event / set event group bits + * + * @param e + */ + void _sendEvent(event_t e); + + void _keepalive(); + + // AsyncTCP callbacks + void _onTimeout(uint32_t time); + void _onDisconnect(AsyncClient *c); + void _onData(void *pbuf, size_t plen); + void _onPoll(AsyncClient *c); +}; + +/** + * @brief WebServer Handler implementation that plays the role of a WebSocket server + * it inherits behavior of original WebSocket server and uses AsyncTCP's thread to + * run callbacks on incoming messages + * + */ +class WSocketServer : public AsyncWebHandler { +public: + // error enqueue to all + enum class msgall_err_t { + ok = 0, // message was enqueued for delivering to all clients + partial, // some of clients queueus are full, message was not enqueued there + none // no clients available, or all outbound queues are full, message discarded + }; + + /** + * @brief Construct a new WSocketServer object + * + * @param url - URL endpoint + * @param handler - event callback handler + * @param msgsize - max inbound message size (8k by default) + * @param qcap - queues size limit + */ + explicit WSocketServer(const char* url, WSocketClient::event_cb_t handler = {}, size_t msgsize = 8 * 1024, size_t qcap = 4) : eventHandler(handler), msgsize(msgsize), qcap(qcap) { _urlhashes.push_back(asyncsrv::hash_djb2a(url)); } + virtual ~WSocketServer(){}; + + /** + * @brief add additional URL to a list of websocket handlers + * + * @param url + */ + void addURLendpoint(std::string_view url){ _urlhashes.push_back(asyncsrv::hash_djb2a(url)); } + + /** + * @brief remove URL from a list of websocket handlers + * + * @param url + */ + void removeURLendpoint(std::string_view url); + + /** + * @brief clear list of handled URLs + * @note new client's won't be able to connect to server unless at least + * one new endpoint added + * + */ + void clearURLendpoints(){ _urlhashes.clear(); } + + /** + * @copydoc WSClient::setOverflowPolicy(overflow_t policy) + * @note default is 'disconnect' + */ + void setOverflowPolicy(WSocketClient::overflow_t policy){ _overflow_policy = policy; } + WSocketClient::overflow_t getOverflowPolicy() const { return _overflow_policy; } + + /** + * @brief Set Message Size limit for the incoming messages + * if peer tries to send us message larger then defined limit size, + * the message will be discarded and peer's connection would be closed with respective error code + * @note only new connections would be affected with changed value + * + * @param size + */ + void setMaxMessageSize(size_t size){ msgsize = size; } + size_t getMaxMessageSize(size_t size) const { return msgsize; } + + /** + * @brief Set in/out Message Queue Size + * + * @param size + */ + void setMessageQueueSize(size_t size){ qcap = size; } + size_t getMessageQueueSize(size_t size) const { return qcap; } + + /** + * @brief Set the WebSocket client Keep A Live + * if set, server will pong it's peers periodically to keep connections alive + * @note it does not check for replies and it's validity, it only sends messages to + * help keep TCP connection alive through firewalls/routers + * + * @param seconds + */ + void setKeepAlive(size_t seconds){ _keepAlivePeriod = seconds; }; + + // get keepalive value + size_t getKeepAlive() const { return _keepAlivePeriod; }; + + /** + * @brief activate server-side message echo + * when activated server will echo incoming messages from any client to all other connected clients. + * This could be usefull for applications that share messages between all connected clients, i.e. WebUIs + * to reflect controls across all connected clients + * @note only messages with text and binary types are echoed, control messages are not echoed + * + * @param enabled + * @param splitHorizon - when true echo message to all clients but the one from where message was received, + * when false, echo back message to all clients include the one who sent it + */ + void setServerEcho(bool enabled = true, bool splitHorizon = true){ _serverEcho = enabled, _serverEchoSplitHorizon = splitHorizon; }; + + // get server echo mode + bool getServerEcho() const { return _serverEcho; }; + + + /** + * @brief check if client with specified id can accept new message for sending + * + * @param id + * @return WSocketClient::err_t - ready to send only if returned value is err_t::ok, otherwise err reason is returned + */ + WSocketClient::err_t clientState(uint32_t id) const; + + /** + * @brief check if all of the connected clients are available for sending data, + * i.e. connection state available and outbound Q is not full + * + * @return msgall_err_t + */ + msgall_err_t clientsState() const; + + // return number of active (connected) clients + size_t activeClientsCount() const; + + // return number of active (connected) clients to specific endpoint + size_t activeEndpointClientsCount(std::string_view endpoint) const { return activeEndpointClientsCount(asyncsrv::hash_djb2a(endpoint)); } + size_t activeEndpointClientsCount(uint32_t hash) const; + + /** + * @brief Get ptr to client with specified id + * + * @param id + * @return WSocketClient* - nullptr if client not found + */ + WSocketClient* getClient(uint32_t id); + WSocketClient const* getClient(uint32_t id) const; + + // find if there is a client with specified id + bool hasClient(uint32_t id) const { + return getClient(id) != nullptr; + } + + /** + * @brief disconnect client + * + * @param id + * @param code + * @param message + */ + WSocketClient::err_t close(uint32_t id, uint16_t code = 1000, const char* message = NULL){ + if (WSocketClient* c = getClient(id)) + return c->close(code, message); + else return WSocketClient::err_t::na; + } + + /** + * @brief disconnect all clients + * + * @param code + * @param message + */ + void closeAll(uint16_t code = 1000, const char* message = NULL){ for (auto &c : _clients) { c.close(code, message); } } + + /** + * @brief sned ping to client + * + * @param id + * @param data + * @param len + * @return true + * @return false + */ + WSocketClient::err_t ping(uint32_t id, const char* data = NULL, size_t len = 0){ + if (WSocketClient *c = getClient(id)) + return c->ping(data, len); + else return WSocketClient::err_t::na; + } + + /** + * @brief send ping to all clients + * + * @param data + * @param len + * @return msgall_err_t + */ + msgall_err_t pingAll(const char* data = NULL, size_t len = 0); + + /** + * @brief send generic message to specific client + * + * @param id + * @param m + * @return WSocketClient::err_t + */ + WSocketClient::err_t message(uint32_t clientid, WSMessagePtr m); + + /** + * @brief Send message to all clients bound to specified endpoint + * + * @param hash endpoint hash + * @param m message + * @return WSocketClient::err_t + */ + msgall_err_t messageToEndpoint(uint32_t hash, WSMessagePtr m); + + /** + * @brief Send message to all clients bound to specified endpoint + * + * @param urlpath endpoint path + * @param m message + * @return WSocketClient::err_t + */ + msgall_err_t messageToEndpoint(std::string_view urlpath, WSMessagePtr m){ return messageToEndpoint(asyncsrv::hash_djb2a(urlpath) , m); }; + + /** + * @brief send generic message to all available clients + * + * @param m + * @return msgall_err_t + */ + msgall_err_t messageAll(WSMessagePtr m); + + /** + * @brief Send text message to client + * this template can accept anything that std::string can be made of + * + * @tparam Args + * @param id + * @param args + * @return WSocketClient::err_t + */ + template + WSocketClient::err_t text(uint32_t id, Args&&... args){ + if (hasClient(id)) + return message(id, std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + else return WSocketClient::err_t::na; + } + + /** + * @brief Send text message to all clients in specified endpoint + * this template can accept anything that std::string can be made of + * + * @tparam Args + * @param hash urlpath hash to send to + * @param args + * @return WSocketClient::err_t + */ + template + msgall_err_t textToEndpoint(uint32_t hash, Args&&... args){ + return messageToEndpoint(hash, std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + } + + template + msgall_err_t textToEndpoint(std::string_view urlpath, Args&&... args){ + return textToEndpoint(asyncsrv::hash_djb2a(urlpath), std::forward(args)...); + } + + /** + * @brief Send text message to all avalable clients + * this template can accept anything that std::string can be made of + * + * @tparam Args + * @param args + * @return WSocketClient::err_t + */ + template + msgall_err_t textAll(Args&&... args){ + return messageAll(std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + } + + /** + * @brief Send String text message to client + * this template can accept anything that Arduino String can be made of + * + * @tparam Args Arduino String constructor arguments + * @param id clien id to send message to + * @param args + * @return WSocketClient::err_t + */ + template + WSocketClient::err_t string(uint32_t id, Args&&... args){ + if (hasClient(id)) + return message(std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + else return WSocketClient::err_t::na; + } + + /** + * @brief Send String text message to all clients in specified endpoint + * this template can accept anything that Arduino String can be made of + * + * @tparam Args + * @param hash urlpath hash to send to + * @param args Arduino String constructor arguments + * @return WSocketClient::err_t + */ + template + msgall_err_t stringToEndpoint(uint32_t hash, Args&&... args){ + return messageToEndpoint(hash, std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + } + + template + msgall_err_t stringToEndpoint(std::string_view urlpath, Args&&... args){ + return stringToEndpoint(asyncsrv::hash_djb2a(urlpath), std::forward(args)...); + } + + /** + * @brief Send String text message all avalable clients + * this template can accept anything that Arduino String can be made of + * + * @tparam Args + * @param id client id to send message to + * @param args Arduino String constructor arguments + * @return WSocketClient::err_t + */ + template + msgall_err_t stringAll(Args&&... args){ + return messageAll(std::make_shared>(WSFrameType_t::text, true, std::forward(args)...)); + } + + /** + * @brief Send binary message to client + * this template can accept anything that std::vector can be made of + * + * @tparam Args + * @param id client id to send message to + * @param args std::vector constructor arguments + * @return WSocketClient::err_t + */ + template + WSocketClient::err_t binary(uint32_t id, Args&&... args){ + if (hasClient(id)) + return message(id, std::make_shared< WSMessageContainer> >(WSFrameType_t::binary, true, std::forward(args)...)); + else return WSocketClient::err_t::na; + } + + /** + * @brief Send binary message to all clients in specified endpoint + * this template can accept anything that std::vector can be made of + * + * @tparam Args + * @param hash urlpath hash to send to + * @param args std::vector constructor arguments + * @return WSocketClient::err_t + */ + template + msgall_err_t binaryToEndpoint(uint32_t hash, Args&&... args){ + return messageToEndpoint(std::make_shared< WSMessageContainer> >(hash, WSFrameType_t::binary, true, std::forward(args)...)); + } + + template + msgall_err_t binaryToEndpoint(std::string_view urlpath, Args&&... args){ + return binaryToEndpoint(std::make_shared< WSMessageContainer> >(asyncsrv::hash_djb2a(urlpath), WSFrameType_t::binary, true, std::forward(args)...)); + } + + /** + * @brief Send binary message all avalable clients + * this template can accept anything that std::vector can be made of + * + * @tparam Args + * @param id client id to send message to + * @param args std::vector constructor arguments + * @return WSocketClient::err_t + */ + template + msgall_err_t binaryAll(Args&&... args){ + return messageAll(std::make_shared< WSMessageContainer> >(WSFrameType_t::binary, true, std::forward(args)...)); + } + + // set webhanshake handler + void handleHandshake(AwsHandshakeHandler handler) { + _handshakeHandler = handler; + } + + /** + * @brief callback for AsyncServer - onboard new ws client + * + * @param request + * @return true + * @return false + */ + virtual bool newClient(AsyncWebServerRequest* request); + +protected: + // a list of url hashes this server is bound to + std::vector _urlhashes; + // WSocketClient events handler + WSocketClient::event_cb_t eventHandler; + + std::list _clients; + #ifdef ESP32 + std::mutex clientslock; + #endif + unsigned long _keepAlivePeriod{0}; + // max message size + size_t msgsize; + // client's queue capacity + size_t qcap; + + // return next available client's ID + uint32_t getNextId() { + return ++_cNextId; + } + + void serverEcho(WSocketClient *c); + + /** + * @brief go through clients list and remove those ones that are disconnected and have no messages pending + * + */ + void _purgeClients(); + +private: + AwsHandshakeHandler _handshakeHandler; + uint32_t _cNextId{0}; + WSocketClient::overflow_t _overflow_policy{WSocketClient::overflow_t::disconnect}; + bool _serverEcho{false}, _serverEchoSplitHorizon; + + + // WebServer methods + bool canHandle(AsyncWebServerRequest *request) const override final; + void handleRequest(AsyncWebServerRequest *request) override final; +}; + +/** + * @brief WebServer Handler implementation that plays the role of a WebSocket server + * + */ +class WSocketServerWorker : public WSocketServer { +public: + + // message callback alias + using msg_cb_t = std::function; + + explicit WSocketServerWorker(const char* url, msg_cb_t msg_handler, WSocketClient::event_cb_t event_handler, size_t msgsize = 8 * 1024, size_t qcap = 4) + : WSocketServer(url, nullptr, msgsize, qcap), _mcb(msg_handler), _ecb(event_handler) {} + + ~WSocketServerWorker(){ stop(); }; + + /** + * @brief start worker task to process WS Messages + * + * @param stack + * @param uxPriority + * @param xCoreID + */ + void start(uint32_t stack = 4096, UBaseType_t uxPriority = 4, BaseType_t xCoreID = tskNO_AFFINITY); + + /** + * @brief stop worker task + * + */ + void stop(); + + /** + * @brief Set Message Callback function + * + * @param handler + */ + void setMessageHandler(msg_cb_t handler){ _mcb = handler; }; + + /** + * @brief Set Event Callback function + * + * @param handler + */ + void setEventHandler(WSocketClient::event_cb_t handler){ _ecb = handler; }; + + /** + * @brief callback for AsyncServer - onboard new ws client + * + * @param request + * @return true + * @return false + */ + bool newClient(AsyncWebServerRequest *request) override; + + +private: + msg_cb_t _mcb; + WSocketClient::event_cb_t _ecb; + // worker task that handles messages + TaskHandle_t _task_hndlr{nullptr}; + void _taskRunner(); + +}; + +#endif // ESP32 +#else // __cplusplus >= 201703L +#warning "WSocket requires C++17, won't build" +#endif // __cplusplus >= 201703L diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index 0cd17493b..8f08c1abd 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -816,10 +816,10 @@ void AsyncWebSocket::_handleEvent(AsyncWebSocketClient *client, AwsEventType typ } } -AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request) { +bool AsyncWebSocket::newClient(AsyncWebServerRequest *request) { _clients.emplace_back(request, this); _handleEvent(&_clients.back(), WS_EVT_CONNECT, request, NULL, 0); - return &_clients.back(); + return true; } void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) { @@ -1228,7 +1228,7 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request) { return; } const AsyncWebHeader *key = request->getHeader(WS_STR_KEY); - AsyncWebServerResponse *response = new AsyncWebSocketResponse(key->value(), this); + AsyncWebServerResponse *response = new AsyncWebSocketResponse(key->value(), [this](AsyncWebServerRequest *r){ return newClient(r); }); if (response == NULL) { #ifdef ESP32 log_e("Failed to allocate"); @@ -1257,8 +1257,7 @@ AsyncWebSocketMessageBuffer *AsyncWebSocket::makeBuffer(const uint8_t *data, siz * Authentication code from https://github.com/Links2004/arduinoWebSockets/blob/master/src/WebSockets.cpp#L480 */ -AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AsyncWebSocket *server) { - _server = server; +AsyncWebSocketResponse::AsyncWebSocketResponse(const String &key, AwsHandshakeHandler cb) : _callback(cb) { _code = 101; _sendContentLength = false; @@ -1314,7 +1313,7 @@ size_t AsyncWebSocketResponse::_ack(AsyncWebServerRequest *request, size_t len, (void)time; if (len) { - _server->_newClient(request); + _callback(request); } return 0; diff --git a/src/AsyncWebSocket.h b/src/AsyncWebSocket.h index 46cdb5e50..e4bf76163 100644 --- a/src/AsyncWebSocket.h +++ b/src/AsyncWebSocket.h @@ -445,7 +445,8 @@ class AsyncWebSocket : public AsyncWebHandler { uint32_t _getNextId() { return _cNextId++; } - AsyncWebSocketClient *_newClient(AsyncWebServerRequest *request); + // callback function that takes the ownership of the connected client, called from a AsyncWebSocketResponse instance + bool newClient(AsyncWebServerRequest *request); void _handleDisconnect(AsyncWebSocketClient *client); void _handleEvent(AsyncWebSocketClient *client, AwsEventType type, void *arg, uint8_t *data, size_t len); bool canHandle(AsyncWebServerRequest *request) const override final; @@ -464,10 +465,10 @@ class AsyncWebSocket : public AsyncWebHandler { class AsyncWebSocketResponse : public AsyncWebServerResponse { private: String _content; - AsyncWebSocket *_server; + AwsHandshakeHandler _callback; public: - AsyncWebSocketResponse(const String &key, AsyncWebSocket *server); + AsyncWebSocketResponse(const String &key, AwsHandshakeHandler cb); void _respond(AsyncWebServerRequest *request); size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time); bool _sourceValid() const { diff --git a/wsocket_examples/MultiEndpoint/MultiEndpoint.ino b/wsocket_examples/MultiEndpoint/MultiEndpoint.ino new file mode 100644 index 000000000..9aa71d806 --- /dev/null +++ b/wsocket_examples/MultiEndpoint/MultiEndpoint.ino @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +/* + this example shows how to handle multiple websocket endpoints within same server instance + +*/ + +#include +#include +#include "endpoints.h" + +#define WIFI_SSID "your_ssid" +#define WIFI_PASSWD "your_pass" + +// WebSocket endpoints to serve +constexpr const char WSENDPOINT_DEFAULT[] = "/ws"; // default endpoint - used for logging messages +constexpr const char WSENDPOINT_ECHO[] = "/wsecho"; // echo server - it replies back messages it receives +constexpr const char WSENDPOINT_SPEED[] = "/wsspeed"; // upstream speed test - sending large data chunks from server to browser + +// *** Event handlers *** +// WS Events dispatcher +void wsEventDispatcher(WSocketClient *client, WSocketClient::event_t event); +// speed tester +void wsSpeedService(WSocketClient *client, WSocketClient::event_t event); +// echo service +void wsEchoService(WSocketClient *client, WSocketClient::event_t event); +// default service +void wsDefaultService(WSocketClient *client, WSocketClient::event_t event); + +// Web Sever +AsyncWebServer server(80); +// WebSocket server instance +WSocketServer ws(WSENDPOINT_DEFAULT, wsEventDispatcher); + +// this function is attached as main callback function for websocket events +void wsEventDispatcher(WSocketClient *client, WSocketClient::event_t event){ + if (event == WSocketClient::event_t::connect || event == WSocketClient::event_t::disconnect){ + // report all new connections to default endpoint + char buff[100]; + snprintf(buff, 100, "Client %s, id:%lu, IP:%s:%u\n", + event == WSocketClient::event_t::connect ? "connected" : "disconnected" , + client->id, + IPAddress (client->client()->getRemoteAddress4().addr).toString().c_str(), + client->client()->getRemotePort() + ); + // send message to clients connected to default /ws endpoint + ws.textToEndpoint(WSENDPOINT_DEFAULT, buff); + Serial.print(buff); + } + + // here we identify on which endpoint we received and event and dispatch to the corresponding handler + switch (client->getURLHash()) + { + case asyncsrv::hash_djb2a(WSENDPOINT_ECHO) : + wsEchoService(client, event); + break; + + case asyncsrv::hash_djb2a(WSENDPOINT_SPEED) : + wsSpeedService(client, event); + break; + + default: + wsDefaultService(client, event); + break; + } + +} + + +// default service - we will use it for event logging and information reports +void wsDefaultService(WSocketClient *client, WSocketClient::event_t event){ + switch (event){ + case WSocketClient::event_t::msgRecv : { + // we do nothing but discard messages here (if any), no use for now + client->dequeueMessage(); + break; + } + + default:; + } +} + +// default service - we will use it for event logging and information reports +void wsEchoService(WSocketClient *client, WSocketClient::event_t event){ + switch (event){ + case WSocketClient::event_t::connect : { + ws.text(client->id, "Hello Client, this is an echo endpoint, message me something and I will reply it back"); + break; + } + + // incoming message + case WSocketClient::event_t::msgRecv : { + auto m = client->dequeueMessage(); + if (m->type == WSFrameType_t::text){ + // got a text message, reformat it and reply + std::string msg("Your message was: "); + msg.append(m->getData()); + // avoid copy and move string to message queue + ws.text(client->id, std::move(msg)); + } + + break; + } + default:; + } +} + + +// for speed test load +uint8_t *buff = nullptr; +size_t buff_size = 32 * 1024; // will send 32k message buffer +//size_t cnt{0}; +// a unique stream id - it is used to avoid sending dublicate frames when multiple clients connected to speedtest endpoint +uint32_t client_id{0}; + +void bulkSend(uint32_t token){ + if (!buff) return; + if (client_id == 0){ + // first client connected grabs the stream + client_id = token; + } else if (token != client_id) { + // we will send next frame only upon delivery the previous one to client owning the stream, for others we ignore + // this is to avoid stacking new frames in the Q when multiple clients are connected to the server + return; + } + + // generate metadata info frame + // this text frame will carry our resources stat + char msg[120]; + snprintf(msg, 120, "FrameSize:%u, Mem:%lu, psram:%lu, token:%lu", buff_size, ESP.getFreeHeap(), ESP.getFreePsram(), client_id); + + ws.textToEndpoint(WSENDPOINT_SPEED, msg); + + // here we MUST ensure that client owning the stream is able to send data, otherwise recursion would crash controller + if (ws.clientState(client_id) == WSocketClient::err_t::ok){ + // for bulk load sending we will use WSMessageStaticBlob object, it will directly send + // payload to websocket peers without intermediate buffer copies and + // it is the most efficient way to send large objects from memory/ROM + auto m = std::make_shared( + WSFrameType_t::binary, // bynary message + true, // final message + reinterpret_cast(buff), buff_size, // buffer to transfer + // the key here to understand when frame buffer completes delivery - for this we set + // the callback back to ourself, so that when when + // this frame would complete delivery, this function is called again to obtain a new frame buffer from camera + [](WSMessageStatus_t s, uint32_t t){ bulkSend(t); }, + client_id // message token + ); + // send message to all peers of this endpoint + ws.messageToEndpoint(WSENDPOINT_SPEED, m); + //++cnt; + } else { + client_id = 0; + } +} + +// speed tester - this endpoint will send bulk dummy payload to anyone who connects here +void wsSpeedService(WSocketClient *client, WSocketClient::event_t event){ + switch (event){ + case WSocketClient::event_t::connect : { + // prepare a buffer with some junk data + if (!buff) + buff = (uint8_t*)malloc(buff_size); + // start an endless bulk transfer + bulkSend(client->id); + break; + } + + // incoming message + case WSocketClient::event_t::msgRecv : { + // silently discard here everything comes in + client->dequeueMessage(); + } + break; + + case WSocketClient::event_t::disconnect : + // if no more clients are connected, release memory + if ( ws.activeEndpointClientsCount(WSENDPOINT_SPEED) == 0){ + delete buff; + buff = nullptr; + } + break; + + default:; + } + +} + + + +// setup our server +void setup() { + Serial.begin(115200); + +#ifndef CONFIG_IDF_TARGET_ESP32H2 + WiFi.mode(WIFI_STA); + WiFi.begin(WIFI_SSID, WIFI_PASSWD); + //WiFi.softAP("esp-captive"); +#endif + // Wait for connection + while (WiFi.status() != WL_CONNECTED) { + delay(250); + Serial.print("."); + } + Serial.println(""); + Serial.print("Connected to "); + //Serial.println(ssid); + Serial.print("IP address: "); + Serial.println(WiFi.localIP()); + Serial.printf("Open the browser and connect to http://%s/\n", WiFi.localIP()); + + // HTTP endpoint + server.on("/", HTTP_GET, [](AsyncWebServerRequest *request) { + // need to cast to uint8_t* + // if you do not, the const char* will be copied in a temporary String buffer + request->send(200, "text/html", (uint8_t *)htmlPage, std::string_view(htmlPage).length()); + }); + + // add endpoint for bulk speed testing + ws.addURLendpoint("/wsspeed"); + + // add endpoint for message echo testing + ws.addURLendpoint("/wsecho"); + + // attach WebSocket server to web server + server.addHandler(&ws); + + // start server + server.begin(); + + //log_e("e setup end"); + //log_w("w server started"); + //log_d("d server debug"); +} + + +void loop() { + // nothing to do here + vTaskDelete(NULL); +} diff --git a/wsocket_examples/MultiEndpoint/endpoints.h b/wsocket_examples/MultiEndpoint/endpoints.h new file mode 100644 index 000000000..331d1d111 --- /dev/null +++ b/wsocket_examples/MultiEndpoint/endpoints.h @@ -0,0 +1,837 @@ +// this page provides minimalistic WebSocket testing dashboard +// disclaimer: this page and code was generated with the help of DeepSeek AI tool + +static const char *htmlPage = R"( + + + + + + WebSocket Dashboard + + + + +
+
+

WebSocket Dashboard

+
+ Server: ws://localhost:8080 +
+
+ + +
+
+

Log Messages /ws

+
+ Status: Disconnected +
+
+
+
+
+
+ + +
+
+

Echo Chat /wsecho

+
+ + +
+ Status: Disconnected +
+
+
+
+
+
+ + +
+
+
+ + +
+
+

Speed Test /wsspeed

+
+ + +
+ Status: Disconnected +
+
+
+
+
+
+ + 0 +
+
+ + 0 +
+
+ + 0 B/s +
+
+ + 0 B +
+
+ + 0 B +
+
+ + 0s +
+
+
+
+
+ + + + +)"; diff --git a/wsocket_examples/VideoStreaming/VideoStreaming.ino b/wsocket_examples/VideoStreaming/VideoStreaming.ino new file mode 100644 index 000000000..fde992e59 --- /dev/null +++ b/wsocket_examples/VideoStreaming/VideoStreaming.ino @@ -0,0 +1,213 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +// +// MJPEG Cam Video streaming over WebSockets using AI-Thinker ESP32-CAM module +// you would need a module with camera to run this example https://www.espboards.dev/esp32/esp32cam/ + +/* + This example implements a WebCam streaming in browser using cheap ESP32-cam module. + The feature here is that frames are trasfered to the browser via WebSockets, + it has several advantages over traditional streamed multipart content via HTTP + - websockets delivers each frame in a separate message + - webserver is not blocked when stream is flowing, you can still send/receive other data via HTTP + - websockets can multiplex vide data and control messages, in this example you can also get memory + stats / frame sizing from controller along with video stream + - WSocketServer can easily replicate stream to multiple connected clients + here in this example you can connect 2-3 clients simultaneously and get smooth stream (limited by wifi bandwidth) + + Change you WiFi creds below, build and flash the code. + Connect to console to monitor for debug messages + Figure out IP of your board and access the board via web browser, watch for video stream +*/ + +#include +#include +#include "esp_camera.h" +#include "mjpeg.h" + + +#define WIFI_SSID "your_ssid" +#define WIFI_PASSWD "your_pass" + +// AI-Thinker ESP32-CAM config - for more details see https://github.com/rzeldent/esp32cam-rtsp/ project +camera_config_t cfg { + .pin_pwdn = 32, + .pin_reset = -1, + .pin_xclk = 0, + + .pin_sscb_sda = 26, + .pin_sscb_scl = 27, + + // Note: LED GPIO is apparently 4 not sure where that goes + // per https://github.com/donny681/ESP32_CAMERA_QR/blob/e4ef44549876457cd841f33a0892c82a71f35358/main/led.c + .pin_d7 = 35, + .pin_d6 = 34, + .pin_d5 = 39, + .pin_d4 = 36, + .pin_d3 = 21, + .pin_d2 = 19, + .pin_d1 = 18, + .pin_d0 = 5, + .pin_vsync = 25, + .pin_href = 23, + .pin_pclk = 22, + .xclk_freq_hz = 20000000, + .ledc_timer = LEDC_TIMER_1, + .ledc_channel = LEDC_CHANNEL_1, + .pixel_format = PIXFORMAT_JPEG, + // .frame_size = FRAMESIZE_UXGA, // needs 234K of framebuffer space + // .frame_size = FRAMESIZE_SXGA, // needs 160K for framebuffer + // .frame_size = FRAMESIZE_XGA, // needs 96K or even smaller FRAMESIZE_SVGA - can work if using only 1 fb + .frame_size = FRAMESIZE_SVGA, + .jpeg_quality = 15, //0-63 lower numbers are higher quality + .fb_count = 2, // if more than one i2s runs in continous mode. Use only with jpeg + .fb_location = CAMERA_FB_IN_PSRAM, + .grab_mode = CAMERA_GRAB_LATEST, + .sccb_i2c_port = 0 +}; + +// camera frame buffer pointer +camera_fb_t *fb{nullptr}; + +// WS Event server callback declaration +void wsEvent(WSocketClient *client, WSocketClient::event_t event); + +// Our WebServer +AsyncWebServer server(80); +// WSocket Server URL and callback function +WSocketServer ws("/wsstream", wsEvent); +// a unique steam id - it is used to avoid sending dublicate frames when multiple clients connected to stream +uint32_t client_id{0}; + +void sendFrame(uint32_t token){ + if (client_id == 0){ + // first client connected grabs the stream token + client_id = token; + } else if (token != client_id) { + // we will send next frame only upon delivery the previous one to client owning the token, others we ignore + // this is to avoid stacking frames clones in the Q when multiple clients are connected to the server + return; + } + + //return the frame buffer back to the driver for reuse + esp_camera_fb_return(fb); + // get 2nd buffer from camera + fb = esp_camera_fb_get(); + + // generate metadata info frame + // this text frame contains memory stat and will be displayed along video stream + char buff[100]; + snprintf(buff, 100, "FrameSize:%u, Mem:%lu, PSRAM:%lu", fb->len, ESP.getFreeHeap(), ESP.getFreePsram()); + ws.textAll(buff); + + // here we MUST ensure that client owning the stream is able to send data, otherwise recursion would crash controller + if (ws.clientState(client_id) == WSocketClient::err_t::ok){ + /* + for video frame sending we will use WSMessageStaticBlob object. + It can send large memory buffers directly to websocket peers without intermediate buffering and data copies + and it is the most efficient way to send static data + */ + auto m = std::make_shared( + WSFrameType_t::binary, // binary message + true, // final message + reinterpret_cast(fb->buf), fb->len, // buffer to transfer + // the key here to understand when frame buffer completes delivery - for this we set + // the callback back to ourself, so that when when frame delivery would be completed, + // this function is called again to obtain a new frame buffer from camera + [](WSMessageStatus_t s, uint32_t t){ sendFrame(t); }, // a callback executed on message delivery + client_id // stream token + ); + // replicate frame to ALL peers + ws.messageAll(m); + } else { + // current client can't receive stream (maybe he disconnected), we reset token here so that other client + // can reconnect and take the ownership of the stream + client_id = 0; + } + + /* + Note! Though this example is able to send video stream to multiple clients simultaneously, it has one gap - + when same buffer is streamed to multiple peers and the 'owner' of stream completes transfer, others might + still be in-progress. The buffer pointer is switched to next one from camera only upon full delivery on + message object destruction. It does not allow pipelining and slower clients could affect the others. + The question of synchronization multiple clients is out of scope of this simple example. It's just a + demonstarion of working with WebSockets. + */ +} + +void wsEvent(WSocketClient *client, WSocketClient::event_t event){ + switch (event){ + // new client connected + case WSocketClient::event_t::connect : { + Serial.printf("Client id:%lu connected\n", client->id); + if (fb) + sendFrame(client->id); + else + ws.text(client->id, "Cam init failed!"); + break; + } + + // client diconnected + case WSocketClient::event_t::disconnect : { + Serial.printf("Client id:%lu disconnected\n", client->id); + if (client_id == client->id) + // reset stream token + client_id = 0; + break; + } + + // any other events + default:; + // incoming messages could be used for controls or any other functionality + // not implemented in this example but should be considered + // If not discaded messages will overflow incoming Q + client->dequeueMessage(); + break; + } +} + +void setup() { + Serial.begin(115200); + + WiFi.mode(WIFI_STA); + WiFi.begin(WIFI_SSID, WIFI_PASSWD); + + // Wait for connection + while (WiFi.status() != WL_CONNECTED) { + delay(250); + Serial.print("."); + } + Serial.println(); + Serial.print("Connected to "); + Serial.println(WIFI_SSID); + Serial.print("IP address: "); + Serial.println(WiFi.localIP()); + Serial.println(); + Serial.printf("to access VideoStream pls open http://%s/\n", WiFi.localIP().toString().c_str()); + + // init camera + esp_err_t err = esp_camera_init(&cfg); + if (err != ESP_OK) + { + Serial.printf("Camera probe failed with error 0x%x\n", err); + } else + fb = esp_camera_fb_get(); + + // server serves index page + server.on("/", HTTP_GET, [](AsyncWebServerRequest *request) { + // need to cast to uint8_t* + // if you do not, the const char* will be copied in a temporary String buffer + request->send(200, "text/html", (uint8_t *)htmlPage, std::string_view(htmlPage).length()); + }); + + // attach our WSocketServer + server.addHandler(&ws); + server.begin(); +} + + +void loop() { + // nothing to do here at all + vTaskDelete(NULL); +} diff --git a/wsocket_examples/VideoStreaming/mjpeg.h b/wsocket_examples/VideoStreaming/mjpeg.h new file mode 100644 index 000000000..2b1b65911 --- /dev/null +++ b/wsocket_examples/VideoStreaming/mjpeg.h @@ -0,0 +1,570 @@ +// this page provides minimalistic WebSocket MJPEG CAM streaming app +// disclaimer: this page and code was generated with the help of DeepSeek AI tool + +static const char *htmlPage = R"( + + + + + + MJPEG WebSocket Stream + + + + +
+

MJPEG WebSocket Video Stream

+ +
+ + +
+ Status: Disconnected +
+
+ +
+ +
+ +
+
+

FPS: 0

+

Frame Count: 0

+
+
+

Frame Size: 0 bytes

+

Memory: 0 bytes free

+

PSRAM: 0 bytes free

+
+
+
+ + + + +)"; diff --git a/wsocket_examples/WebChat/WebChat.ino b/wsocket_examples/WebChat/WebChat.ino new file mode 100644 index 000000000..81d03592e --- /dev/null +++ b/wsocket_examples/WebChat/WebChat.ino @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +// +// A simple WebChat room working over WebSockets +// + +/* + This example would show how WSocketServer could register connections/disconnections for every new user, + deliver messages to all connected user and replicate incoming data amoung all members of chat room. + Change you WiFi creds below, build and flash the code. + Connect to console to monitor debug messages. + Figure out IP of your board and access the board via web browser using two or more different devices, + Chat with yourself, have fun :) +*/ + +#include +#include +#include "html.h" + +#define WIFI_SSID "YourSSID" +#define WIFI_PASSWD "YourPasswd" + +// WS Event server callback declaration +void wsEvent(WSocketClient *client, WSocketClient::event_t event); + +// Our WebServer +AsyncWebServer server(80); +// WSocket Server URL and callback function +WSocketServer ws("/chat", wsEvent); + +void wsEvent(WSocketClient *client, WSocketClient::event_t event){ + switch (event){ + // new client connected + case WSocketClient::event_t::connect : { + Serial.printf("Client id:%u connected\n", client->id); + char buff[100]; + snprintf(buff, 100, "WServer: Hello user, your id is:%u, there are %u members on-line, pls be polite here!", client->id, ws.activeClientsCount()); + // greet new user personally + ws.text(client->id, buff); + snprintf(buff, 100, "WServer: New client with id:%u joined the room", client->id); + // Announce new user entered the room + ws.textAll(client->id, buff); + break; + } + + // client diconnected + case WSocketClient::event_t::disconnect : { + Serial.printf("Client id:%u disconnected\n", client->id); + char buff[100]; + snprintf(buff, 100, "WServer: Client with id:%u left the room, %u members on-line", client->id, ws.activeClientsCount()); + ws.textAll(client->id, buff); + break; + } + + // new messages from clients + case WSocketClient::event_t::msgRecv : + // any incoming messages we must deQ and discard, + // there is no need to resend it back to, it's handled on the server side. + // If not discaded messages will overflow incoming Q + client->dequeueMessage(); + break; + + default:; + } +} + + +void setup() { + Serial.begin(115200); + + WiFi.mode(WIFI_STA); + WiFi.begin(WIFI_SSID, WIFI_PASSWD); + + // Wait for connection + while (WiFi.status() != WL_CONNECTED) { + delay(250); + Serial.print("."); + } + Serial.println(); + Serial.println("Connected to WIFI_SSID"); + Serial.print("IP address: "); + Serial.println(WiFi.localIP()); + Serial.println(); + Serial.printf("to access WebChat pls open http://%s/\n", WiFi.localIP().toString().c_str()); + + server.on("/", HTTP_GET, [](AsyncWebServerRequest *request) { + // need to cast to uint8_t* + // if you do not, the const char* will be copied in a temporary String buffer + request->send(200, "text/html", (uint8_t *)htmlChatPage, strlen(htmlChatPage)); + }); + + // keep TCP/WS connection open + ws.setKeepAlive(20); + + /* + Enable server echo to reflect incoming messages to all participans of the current chat room + */ + ws.setServerEcho(true, true); // 2nd 'true' here is 'SplitHorizon' - server will reflect all message to every peer except the one where it came from + + server.addHandler(&ws); + + server.begin(); +} + + +void loop() { + // nothing to do here + delay(100); +} diff --git a/wsocket_examples/WebChat/html.h b/wsocket_examples/WebChat/html.h new file mode 100644 index 000000000..e1ff64690 --- /dev/null +++ b/wsocket_examples/WebChat/html.h @@ -0,0 +1,249 @@ +// this page provides minimalistic WebChat room +// disclaimer: this page and code was generated with the help of AI tools + +static const char *htmlChatPage = R"( + + + + + + AsyncWebSocket Chat example + + + + + +
+
+

Enter your username

+ +
+ +
+
+ + +
+ + +
+ + +
+ + + + +)";