First steps to do async

This commit is contained in:
Markus Sattler
2016-01-23 16:27:02 +01:00
parent ece771a275
commit 57e30e0634
8 changed files with 435 additions and 206 deletions

View File

@@ -30,9 +30,9 @@
extern "C" {
#ifdef CORE_HAS_LIBB64
#include <libb64/cencode.h>
#include <libb64/cencode.h>
#else
#include "libb64/cencode_inc.h"
#include "libb64/cencode_inc.h"
#endif
}
@@ -46,8 +46,6 @@ extern "C" {
#endif
#define WEBSOCKETS_MAX_HEADER_SIZE (14)
/**
*
* @param client WSclient_t * ptr to the client struct
@@ -120,7 +118,6 @@ void WebSockets::sendFrame(WSclient_t * client, WSopcode_t opcode, uint8_t * pay
headerSize += 4;
}
#ifdef WEBSOCKETS_USE_BIG_MEM
// only for ESP since AVR has less HEAP
// try to send data in one TCP package (only if some free Heap is there)
@@ -161,22 +158,35 @@ void WebSockets::sendFrame(WSclient_t * client, WSopcode_t opcode, uint8_t * pay
}
if(length < 126) {
*headerPtr |= length; headerPtr++;
*headerPtr |= length;
headerPtr++;
} else if(length < 0xFFFF) {
*headerPtr |= 126; headerPtr++;
*headerPtr = ((length >> 8) & 0xFF); headerPtr++;
*headerPtr = (length & 0xFF); headerPtr++;
*headerPtr |= 126;
headerPtr++;
*headerPtr = ((length >> 8) & 0xFF);
headerPtr++;
*headerPtr = (length & 0xFF);
headerPtr++;
} else {
// Normally we never get here (to less memory)
*headerPtr |= 127; headerPtr++;
*headerPtr = 0x00; headerPtr++;
*headerPtr = 0x00; headerPtr++;
*headerPtr = 0x00; headerPtr++;
*headerPtr = 0x00; headerPtr++;
*headerPtr = ((length >> 24) & 0xFF); headerPtr++;
*headerPtr = ((length >> 16) & 0xFF); headerPtr++;
*headerPtr = ((length >> 8) & 0xFF); headerPtr++;
*headerPtr = (length & 0xFF); headerPtr++;
*headerPtr |= 127;
headerPtr++;
*headerPtr = 0x00;
headerPtr++;
*headerPtr = 0x00;
headerPtr++;
*headerPtr = 0x00;
headerPtr++;
*headerPtr = 0x00;
headerPtr++;
*headerPtr = ((length >> 24) & 0xFF);
headerPtr++;
*headerPtr = ((length >> 16) & 0xFF);
headerPtr++;
*headerPtr = ((length >> 8) & 0xFF);
headerPtr++;
*headerPtr = (length & 0xFF);
headerPtr++;
}
if(mask) {
@@ -185,7 +195,8 @@ void WebSockets::sendFrame(WSclient_t * client, WSopcode_t opcode, uint8_t * pay
// by this fact its possible the do the masking
for(uint8_t x = 0; x < sizeof(maskKey); x++) {
maskKey[x] = random(0xFF);
*headerPtr = maskKey[x]; headerPtr++;
*headerPtr = maskKey[x];
headerPtr++;
}
uint8_t * dataMaskPtr;
@@ -201,10 +212,14 @@ void WebSockets::sendFrame(WSclient_t * client, WSopcode_t opcode, uint8_t * pay
}
} else {
*headerPtr = maskKey[0]; headerPtr++;
*headerPtr = maskKey[1]; headerPtr++;
*headerPtr = maskKey[2]; headerPtr++;
*headerPtr = maskKey[3]; headerPtr++;
*headerPtr = maskKey[0];
headerPtr++;
*headerPtr = maskKey[1];
headerPtr++;
*headerPtr = maskKey[2];
headerPtr++;
*headerPtr = maskKey[3];
headerPtr++;
}
}
@@ -237,154 +252,211 @@ void WebSockets::sendFrame(WSclient_t * client, WSopcode_t opcode, uint8_t * pay
}
/**
* callen when HTTP header is done
* @param client WSclient_t * ptr to the client struct
*/
void WebSockets::headerDone(WSclient_t * client) {
client->status = WSC_CONNECTED;
client->cWsRXsize = 0;
DEBUG_WEBSOCKETS("[WS][%d][headerDone] Header Handling Done (%uus).\n", client->num);
#if (WEBSOCKETS_NETWORK_TYPE == NETWORK_ESP8266_ASYNC)
client->cHttpLine = "";
handleWebsocket(client);
#endif
}
/**
* handle the WebSocket stream
* @param client WSclient_t * ptr to the client struct
*/
void WebSockets::handleWebsocket(WSclient_t * client) {
if(client->cWsRXsize == 0) {
handleWebsocketCb(client);
}
}
uint8_t buffer[8] = { 0 };
/**
* wait for
* @param client
* @param size
*/
bool WebSockets::handleWebsocketWaitFor(WSclient_t * client, size_t size) {
if(size > WEBSOCKETS_MAX_HEADER_SIZE) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocketWaitFor] size: %d to big!\n", client->num, size);
return false;
}
bool fin;
bool rsv1;
bool rsv2;
bool rsv3;
WSopcode_t opCode;
bool mask;
size_t payloadLen;
if(client->cWsRXsize >= size) {
return true;
}
uint8_t maskKey[4];
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocketWaitFor] size: %d cWsRXsize: %d\n", client->num, size, client->cWsRXsize);
readCb(client, &client->cWsHeader[client->cWsRXsize], (size - client->cWsRXsize), std::bind([](WebSockets * server, size_t size, WSclient_t * client, bool ok) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocketWaitFor][readCb] size: %d ok: %d\n", client->num, size, ok);
if(ok) {
client->cWsRXsize = size;
server->handleWebsocketCb(client);
} else {
DEBUG_WEBSOCKETS("[WS][%d][readCb] failed.\n", client->num);
client->cWsRXsize = 0;
// timeout or error
server->clientDisconnect(client, 1002);
}
}, this, size, std::placeholders::_1, std::placeholders::_2));
return false;
}
void WebSockets::handleWebsocketCb(WSclient_t * client) {
uint8_t * buffer = client->cWsHeader;
WSMessageHeader_t * header = &client->cWsHeaderDecode;
uint8_t * payload = NULL;
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] ------- read massage frame -------\n", client->num);
uint8_t headerLen = 2;
if(!readWait(client, buffer, 2)) {
//timeout
clientDisconnect(client, 1002);
if(!handleWebsocketWaitFor(client, headerLen)) {
return;
}
// split first 2 bytes in the data
fin = ((buffer[0] >> 7) & 0x01);
rsv1 = ((buffer[0] >> 6) & 0x01);
rsv2 = ((buffer[0] >> 5) & 0x01);
rsv3 = ((buffer[0] >> 4) & 0x01);
opCode = (WSopcode_t) (buffer[0] & 0x0F);
header->fin = ((*buffer >> 7) & 0x01);
header->rsv1 = ((*buffer >> 6) & 0x01);
header->rsv2 = ((*buffer >> 5) & 0x01);
header->rsv3 = ((*buffer >> 4) & 0x01);
header->opCode = (WSopcode_t) (*buffer & 0x0F);
buffer++;
mask = ((buffer[1] >> 7) & 0x01);
payloadLen = (WSopcode_t) (buffer[1] & 0x7F);
header->mask = ((*buffer >> 7) & 0x01);
header->payloadLen = (WSopcode_t) (*buffer & 0x7F);
buffer++;
if(payloadLen == 126) {
if(!readWait(client, buffer, 2)) {
//timeout
clientDisconnect(client, 1002);
if(header->payloadLen == 126) {
headerLen += 4;
if(!handleWebsocketWaitFor(client, headerLen)) {
return;
}
payloadLen = buffer[0] << 8 | buffer[1];
} else if(payloadLen == 127) {
header->payloadLen = buffer[0] << 8 | buffer[1];
buffer += 2;
} else if(header->payloadLen == 127) {
headerLen += 8;
// read 64bit integer as length
if(!readWait(client, buffer, 8)) {
//timeout
clientDisconnect(client, 1002);
if(!handleWebsocketWaitFor(client, headerLen)) {
return;
}
if(buffer[0] != 0 || buffer[1] != 0 || buffer[2] != 0 || buffer[3] != 0) {
// really to big!
payloadLen = 0xFFFFFFFF;
header->payloadLen = 0xFFFFFFFF;
} else {
payloadLen = buffer[4] << 24 | buffer[5] << 16 | buffer[6] << 8 | buffer[7];
header->payloadLen = buffer[4] << 24 | buffer[5] << 16 | buffer[6] << 8 | buffer[7];
}
buffer += 8;
}
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] fin: %u rsv1: %u rsv2: %u rsv3 %u opCode: %u\n", client->num, fin, rsv1, rsv2, rsv3, opCode);
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] mask: %u payloadLen: %u\n", client->num, mask, payloadLen);
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] ------- read massage frame -------\n", client->num);
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] fin: %u rsv1: %u rsv2: %u rsv3 %u opCode: %u\n", client->num, header->fin, header->rsv1, header->rsv2, header->rsv3, header->opCode);
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] mask: %u payloadLen: %u\n", client->num, header->mask, header->payloadLen);
if(payloadLen > WEBSOCKETS_MAX_DATA_SIZE) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] payload to big! (%u)\n", client->num, payloadLen);
if(header->payloadLen > WEBSOCKETS_MAX_DATA_SIZE) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] payload to big! (%u)\n", client->num, header->payloadLen);
clientDisconnect(client, 1009);
return;
}
if(mask) {
if(!readWait(client, maskKey, 4)) {
//timeout
clientDisconnect(client, 1002);
if(header->mask) {
headerLen += 4;
if(!handleWebsocketWaitFor(client, headerLen)) {
return;
}
header->maskKey = buffer;
buffer += 4;
}
if(payloadLen > 0) {
if(header->payloadLen > 0) {
// if text data we need one more
payload = (uint8_t *) malloc(payloadLen + 1);
payload = (uint8_t *) malloc(header->payloadLen + 1);
if(!payload) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] to less memory to handle payload %d!\n", client->num, payloadLen);
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] to less memory to handle payload %d!\n", client->num, header->payloadLen);
clientDisconnect(client, 1011);
return;
}
readCb(client, payload, header->payloadLen, std::bind(&WebSockets::handleWebsocketPayloadCb, this, std::placeholders::_1, std::placeholders::_2, payload));
} else {
handleWebsocketPayloadCb(client, true, NULL);
}
}
if(!readWait(client, payload, payloadLen)) {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] missing data!\n", client->num);
free(payload);
clientDisconnect(client, 1002);
return;
}
void WebSockets::handleWebsocketPayloadCb(WSclient_t * client, bool ok, uint8_t * payload) {
WSMessageHeader_t * header = &client->cWsHeaderDecode;
if(ok) {
if(header->payloadLen > 0) {
payload[header->payloadLen] = 0x00;
payload[payloadLen] = 0x00;
if(mask) {
//decode XOR
for(size_t i = 0; i < payloadLen; i++) {
payload[i] = (payload[i] ^ maskKey[i % 4]);
if(header->mask) {
//decode XOR
for(size_t i = 0; i < header->payloadLen; i++) {
payload[i] = (payload[i] ^ header->maskKey[i % 4]);
}
}
}
}
switch(opCode) {
case WSop_text:
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] text: %s\n", client->num, payload);
// no break here!
case WSop_binary:
messageRecived(client, opCode, payload, payloadLen);
break;
case WSop_ping:
// send pong back
sendFrame(client, WSop_pong, payload, payloadLen);
break;
case WSop_pong:
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] get pong (%s)\n", client->num, payload);
break;
case WSop_close:
{
switch(header->opCode) {
case WSop_text:
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] text: %s\n", client->num, payload);
// no break here!
case WSop_binary:
messageRecived(client, header->opCode, payload, header->payloadLen);
break;
case WSop_ping:
// send pong back
sendFrame(client, WSop_pong, payload, header->payloadLen);
break;
case WSop_pong:
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] get pong (%s)\n", client->num, payload);
break;
case WSop_close: {
uint16_t reasonCode = 1000;
if(payloadLen >= 2) {
if(header->payloadLen >= 2) {
reasonCode = payload[0] << 8 | payload[1];
}
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] get ask for close. Code: %d", client->num, reasonCode);
if(payloadLen > 2) {
DEBUG_WEBSOCKETS(" (%s)\n", (payload+2));
if(header->payloadLen > 2) {
DEBUG_WEBSOCKETS(" (%s)\n", (payload + 2));
} else {
DEBUG_WEBSOCKETS("\n");
}
clientDisconnect(client, 1000);
}
break;
case WSop_continuation:
// continuation is not supported
clientDisconnect(client, 1003);
break;
default:
clientDisconnect(client, 1002);
break;
}
break;
case WSop_continuation:
// continuation is not supported
clientDisconnect(client, 1003);
break;
default:
clientDisconnect(client, 1002);
break;
}
if(payload) {
if(payload) {
free(payload);
}
// reset input
client->cWsRXsize = 0;
#if (WEBSOCKETS_NETWORK_TYPE == NETWORK_ESP8266_ASYNC)
//register callback for next message
handleWebsocketWaitFor(client, 2);
#endif
} else {
DEBUG_WEBSOCKETS("[WS][%d][handleWebsocket] missing data!\n", client->num);
free(payload);
clientDisconnect(client, 1002);
}
}
/**
@@ -417,7 +489,7 @@ String WebSockets::acceptKey(String clientKey) {
* @return base64 encoded String
*/
String WebSockets::base64_encode(uint8_t * data, size_t length) {
size_t size = ((length*1.6f)+1);
size_t size = ((length * 1.6f) + 1);
char * buffer = (char *) malloc(size);
if(buffer) {
base64_encodestate _state;
@@ -439,28 +511,46 @@ String WebSockets::base64_encode(uint8_t * data, size_t length) {
* @param n size_t byte count
* @return true if ok
*/
bool WebSockets::readWait(WSclient_t * client, uint8_t *out, size_t n) {
bool WebSockets::readCb(WSclient_t * client, uint8_t * out, size_t n, WSreadWaitCb cb) {
#if (WEBSOCKETS_NETWORK_TYPE == NETWORK_ESP8266_ASYNC)
client->tcp->readBytes(out, n, std::bind([](WSclient_t * client, bool ok, WSreadWaitCb cb) {
if(cb) {
cb(client, ok);
}
}, client, std::placeholders::_1, cb));
#else
unsigned long t = millis();
size_t len;
DEBUG_WEBSOCKETS("[readCb] n: %d t: %d\n", n, t);
while(n > 0) {
if(!client->tcp) {
DEBUG_WEBSOCKETS("[readWait] tcp is null!\n");
if(client->tcp == NULL) {
DEBUG_WEBSOCKETS("[readCb] tcp is null!\n");
if(cb) {
cb(client, false);
}
return false;
}
if(!client->tcp->connected()) {
DEBUG_WEBSOCKETS("[readWait] not connected!\n");
DEBUG_WEBSOCKETS("[readCb] not connected!\n");
if(cb) {
cb(client, false);
}
return false;
}
if((millis() - t) > WEBSOCKETS_TCP_TIMEOUT) {
DEBUG_WEBSOCKETS("[readWait] receive TIMEOUT!\n");
DEBUG_WEBSOCKETS("[readCb] receive TIMEOUT! %d\n", (millis() - t));
if(cb) {
cb(client, false);
}
return false;
}
if(!client->tcp->available()) {
#ifdef ESP8266
#if (WEBSOCKETS_NETWORK_TYPE == NETWORK_ESP8266)
delay(0);
#endif
continue;
@@ -475,9 +565,14 @@ bool WebSockets::readWait(WSclient_t * client, uint8_t *out, size_t n) {
} else {
//DEBUG_WEBSOCKETS("Receive %d left %d!\n", len, n);
}
#ifdef ESP8266
#if (WEBSOCKETS_NETWORK_TYPE == NETWORK_ESP8266)
delay(0);
#endif
}
if(cb) {
cb(client, true);
}
#endif
return true;
}