Move proxy logic into a separate js class file

master
paradust7 2022-05-08 03:11:29 +00:00
parent da642dfda0
commit d380e360ab
9 changed files with 214 additions and 120 deletions

View File

@ -1,7 +1,19 @@
add_library(emsocket emsocket.cpp emsocketctl.cpp ProxyLink.cpp VirtualSocket.cpp)
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h;emsocketctl.h")
INSTALL(TARGETS emsocket
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include
add_custom_command(
OUTPUT proxyjs.gen.h
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/gen.sh
ARGS ${CMAKE_CURRENT_SOURCE_DIR}/proxy.js proxyjs.gen.h
MAIN_DEPENDENCY ${CMAKE_CURRENT_SOURCE_DIR}/proxy.js
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/gen.sh
)
add_custom_target(proxyjs ALL DEPENDS proxyjs.gen.h)
add_library(emsocket emsocket.cpp emsocketctl.cpp ProxyLink.cpp VirtualSocket.cpp)
set_target_properties(emsocket PROPERTIES PUBLIC_HEADER "emsocket.h;emsocketctl.h")
target_include_directories(emsocket PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
add_dependencies(emsocket proxyjs)
INSTALL(
TARGETS emsocket
LIBRARY DESTINATION lib
PUBLIC_HEADER DESTINATION include
)

View File

@ -48,100 +48,62 @@ extern "C" {
void proxylink_onmessage(void *thisPtr, const void *buf, size_t n);
}
EM_JS(int, setup_proxylink_websocket, (const char* url, void *thisPtr), {
if (!self.hasOwnProperty('mywebsockets')) {
self.mywebsockets = [null];
EM_JS(int, proxylink_new, (const char* ip, uint16_t port, bool udp, void *thisPtr), {
if (!self.hasOwnProperty('w_proxylink_onopen')) {
self.w_proxylink_onopen = Module.cwrap('proxylink_onopen', null, ['number']);
self.w_proxylink_onerror = Module.cwrap('proxylink_onerror', null, ['number']);
self.w_proxylink_onclose = Module.cwrap('proxylink_onclose', null, ['number']);
self.w_proxylink_onmessage = Module.cwrap('proxylink_onmessage', null, ['number', 'number', 'number']);
}
const ws = new WebSocket(UTF8ToString(url));
const index = mywebsockets.length;
mywebsockets.push(ws);
ws.binaryType = "arraybuffer";
ws.userEnabled = false;
ws.userBuffer = [];
ws.onopen = (e) => {
w_proxylink_onopen(thisPtr);
};
ws.onerror = (e) => {
w_proxylink_onerror(thisPtr);
};
ws.onclose = (e) => {
w_proxylink_onclose(thisPtr);
};
ws.onmessage = (e) => {
var len = e.data.byteLength;
const link = new ProxyLink(UTF8ToString(ip), port, udp);
link.onopen = () => { w_proxylink_onopen(thisPtr); };
link.onerror = () => { w_proxylink_onerror(thisPtr); };
link.onclose = () => { w_proxylink_onclose(thisPtr); };
link.onmessage = (data) => {
var len = data.byteLength;
// TODO: Get rid of this allocation
var buf = _malloc(len);
HEAPU8.set(new Uint8Array(e.data), buf);
HEAPU8.set(new Uint8Array(data), buf);
w_proxylink_onmessage(thisPtr, buf, len);
_free(buf);
};
return index;
return link.index;
});
EM_JS(void, send_proxylink_websocket, (int index, int isUser, const void *data, int len), {
const ws = mywebsockets[index];
const bdata = new Uint8Array(HEAPU8.subarray(data, data + len));
if (ws) {
if (isUser) {
if (ws.userEnabled) {
ws.send(bdata);
} else {
ws.userBuffer.push(bdata);
}
} else {
ws.send(bdata);
}
EM_JS(void, proxylink_send, (int index, const void *data, int len), {
const link = ProxyLink.get(index);
if (link) {
link.send(HEAPU8.subarray(data, data + len));
}
});
EM_JS(void, enable_user_proxylink_websocket, (int index), {
const ws = mywebsockets[index];
if (ws) {
ws.userEnabled = true;
if (ws.userBuffer.length > 0) {
for (const bdata of ws.userBuffer) {
ws.send(bdata);
}
ws.userBuffer = [];
}
}
});
EM_JS(void, delete_proxylink_websocket, (int index), {
const ws = mywebsockets[index];
if (ws) {
delete mywebsockets[index];
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = null;
ws.close();
EM_JS(void, proxylink_close, (int index), {
const link = ProxyLink.get(index);
if (link) {
link.close();
}
});
namespace emsocket {
/*
* Wrapper around the javascript class of the same name
*/
class ProxyLink : public Link {
public:
ProxyLink() = delete;
ProxyLink(const ProxyLink &) = delete;
ProxyLink& operator=(const ProxyLink &) = delete;
ProxyLink(VirtualSocket *vs_, const std::string &proxyUrl, const SocketAddr &addr_, bool udp_)
: wsIndex(-1),
vs(vs_),
ProxyLink(VirtualSocket *vs_, const SocketAddr &addr_, bool udp_)
: vs(vs_),
addr(addr_),
udp(udp_),
sentProxyRequest(false),
receivedProxyAuth(false)
wsIndex(-1)
{
//std::cerr << "Initialized proxy websocket" << std::endl;
emsocket_run_on_io_thread(true, [this, proxyUrl]() {
wsIndex = setup_proxylink_websocket(proxyUrl.c_str(), this);
emsocket_run_on_io_thread(true, [this]() {
wsIndex = proxylink_new(addr.getIP().c_str(), addr.getPort(), udp, this);
});
assert(wsIndex > 0);
}
@ -158,7 +120,7 @@ public:
int wsIndex_ = wsIndex;
Packet *pkt = new Packet(SocketAddr(), data, len);
emsocket_run_on_io_thread(false, [wsIndex_, pkt]() {
send_proxylink_websocket(wsIndex_, 1, &pkt->data[0], pkt->data.size());
proxylink_send(wsIndex_, &pkt->data[0], pkt->data.size());
delete pkt;
});
}
@ -167,58 +129,27 @@ public:
// Called from I/O thread
void onopen() {
// Send a proxy request
char buf[128];
sprintf(buf, "PROXY IPV4 %s %s %u", (udp ? "UDP" : "TCP"), addr.getIP().c_str(), addr.getPort());
send_proxylink_websocket(wsIndex, 0, buf, strlen(buf));
//std::cerr << "Sent websocket PROXY handshake" << std::endl;
sentProxyRequest = true;
vs->linkConnected();
}
// Called from I/O thread
void onerror() {
//std::cerr << "ProxyLink got websocket error" << std::endl;
hangup();
}
// Called from I/O thread
void onclose() {
//std::cerr << "ProxyLink got websocket close" << std::endl;
hangup();
}
// Called from I/O thread
void onmessage(const void *buf, int n) {
if (!sentProxyRequest) {
//std::cerr << "ProxyLink got invalid message before proxy request" << std::endl;
hangup();
return;
}
if (!receivedProxyAuth) {
// Check for proxy auth
if (n > 16) {
//std::cerr << "ProxyLink unexpected auth message length (" << n << ")" << std::endl;
hangup();
return;
}
std::string response((const char*)buf, n);
if (response == "PROXY OK") {
receivedProxyAuth = true;
enable_user_proxylink_websocket(wsIndex);
vs->linkConnected();
return;
}
//std::cerr << "ProxyLink received bad auth: '" << response << "' of length " << n << std::endl;
hangup();
return;
}
// Regular message
vs->linkReceived(addr, buf, n);
}
// Called from I/O thread
void hangup() {
delete_proxylink_websocket(wsIndex);
proxylink_close(wsIndex);
vs->linkShutdown();
}
private:
@ -226,12 +157,10 @@ private:
SocketAddr addr;
bool udp;
int wsIndex;
bool sentProxyRequest;
bool receivedProxyAuth;
};
Link* make_proxy_link(VirtualSocket* vs, const std::string &proxyUrl, const SocketAddr &addr, bool udp) {
return new ProxyLink(vs, proxyUrl, addr, udp);
Link* make_proxy_link(VirtualSocket* vs, const SocketAddr &addr, bool udp) {
return new ProxyLink(vs, addr, udp);
}
} // namespace

View File

@ -30,6 +30,6 @@ namespace emsocket {
class VirtualSocket;
Link* make_proxy_link(VirtualSocket* vs, const std::string &proxyUrl, const SocketAddr &addr, bool udp);
Link* make_proxy_link(VirtualSocket* vs, const SocketAddr &addr, bool udp);
} // namespace

View File

@ -50,6 +50,14 @@ public:
}
}
bool isIPv4() const {
return true; // Only one supported at the moment
}
bool isIPv6() const {
return false;
}
std::string getIP() const {
char buf[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &(sin.sin_addr), buf, sizeof(buf));

View File

@ -183,7 +183,7 @@ bool VirtualSocket::startConnect(const SocketAddr &dest) {
if (dest.isLocalHost()) {
std::cerr << "emsocket local TCP not yet supported" << std::endl;
return false;
} else if (strlen(emsocket_get_proxy()) > 0) {
} else {
assert(!is_udp);
assert(!link);
assert(!is_connected);
@ -192,7 +192,7 @@ bool VirtualSocket::startConnect(const SocketAddr &dest) {
bind(SocketAddr()); // bind to random port
}
remoteAddr = dest;
link = make_proxy_link(this, emsocket_get_proxy(), dest, is_udp);
link = make_proxy_link(this, dest, is_udp);
return true;
}
std::cerr << "emsocket no proxy set" << std::endl;
@ -273,7 +273,7 @@ void VirtualSocket::sendto(const void *buf, size_t n, const SocketAddr& to) {
}
} else {
remoteAddr = to;
link = make_proxy_link(this, emsocket_get_proxy(), to, is_udp);
link = make_proxy_link(this, to, is_udp);
}
link->send(buf, n);
}

View File

@ -31,9 +31,9 @@ SOFTWARE.
#include <condition_variable>
#include <pthread.h>
#include <emscripten.h>
#include "proxyjs.gen.h"
namespace emsocket {
std::string currentProxyUrl;
bool didInit;
pthread_t ioThread;
std::mutex ioMutex;
@ -58,12 +58,16 @@ void emsocket_init(void) {
}
}
void emsocket_set_proxy(const char *proxyUrl) {
currentProxyUrl = proxyUrl ? proxyUrl : "";
}
EM_JS(void, _set_proxy, (const char *url), {
setProxy(UTF8ToString(url));
});
const char* emsocket_get_proxy(void) {
return currentProxyUrl.c_str();
void emsocket_set_proxy(const char *url) {
char *urlcopy = strdup(url);
emsocket_run_on_io_thread(false, [urlcopy]() {
_set_proxy(urlcopy);
free(urlcopy);
});
}
@ -82,6 +86,10 @@ static void io_thread_reenter(void) {
}
static void *io_thread_main(void *) {
init_proxyjs();
// TODO: emsocket_run_on_io_thread should use a WebWorker
// message to wakeup the I/O thread instead of polling
// every 10ms.
emscripten_set_main_loop(io_thread_reenter, 100, EM_TRUE);
abort(); // unreachable
}

View File

@ -29,7 +29,6 @@ extern "C" {
void emsocket_init(void);
void emsocket_set_proxy(const char *proxyUrl);
const char* emsocket_get_proxy(void);
#ifdef __cplusplus
}

10
src/emsocket/gen.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
echo "CALLED WITH $1 $2"
INPUT_FILE="$1"
OUTPUT_FILE="$2"
echo 'EM_JS(void, init_proxyjs, (), {' > "$OUTPUT_FILE"
cat "$INPUT_FILE" >> "$OUTPUT_FILE"
echo "});" >> "$OUTPUT_FILE"

128
src/emsocket/proxy.js Normal file
View File

@ -0,0 +1,128 @@
self.proxyUrl = "";
function setProxy(url) {
self.proxyUrl = url;
}
self.setProxy = setProxy;
self.textEncoder = new TextEncoder();
self.textDecoder = new TextDecoder();
class ProxyLink {
constructor(ip, port, udp) {
this.ip = ip;
this.port = port;
this.udp = udp;
this.onopen = null;
this.onerror = null;
this.onclose = null;
this.onmessage = null;
this.sentProxyRequest = false;
this.userEnabled = false;
this.userBuffer = [];
this.index = ProxyLink.links.length;
ProxyLink.links.push(this);
const ws = new WebSocket(proxyUrl);
this.ws = ws;
ws.binaryType = "arraybuffer";
ws.onopen = this.handleOpen.bind(this);
ws.onerror = this.handleError.bind(this);
ws.onclose = this.handleClose.bind(this);
ws.onmessage = this.handleMessage.bind(this);
}
handleOpen() {
// Send proxy request
const req = `PROXY IPV4 ${this.udp ? "UDP" : "TCP"} ${this.ip} ${this.port}`;
this.ws.send(textEncoder.encode(req));
this.sentProxyRequest = true;
}
handleError() {
if (this.onerror) {
this.onerror();
}
}
handleClose() {
this._close();
}
handleMessage(e) {
if (!this.userEnabled) {
// Waiting for proxy auth message
if (!this.sentProxyRequest) {
console.log("Got invalid message before proxy request");
this._close();
return;
}
const ok = textDecoder.decode(e.data);
if (ok != "PROXY OK") {
console.log("Got invalid proxy message");
this._close();
return;
}
//console.log("Got proxy OK");
this._enable();
this.onopen();
return;
}
//console.log("Relaying message");
this.onmessage(e.data);
}
send(data) {
//console.log("Got send");
const ws = this.ws;
if (!ws) return;
// If this copy isn't done, send fails with:
// "The provided ArrayBufferView value must not be shared."
data = new Uint8Array(data);
if (this.userEnabled) {
//console.log("Sending direct");
ws.send(data);
} else {
//console.log("Sending to userBuffer");
this.userBuffer.push(data);
}
}
_enable() {
this.userEnabled = true;
for (const data of this.userBuffer) {
//console.log("Flushing from buffer");
this.ws.send(data);
}
this.userBuffer = null;
}
// Call this internally. It will dispatch callbacks.
_close() {
const ws = this.ws;
if (ws) {
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = null;
ws.close();
this.ws = null;
}
if (this.onclose) {
this.onclose();
}
}
// This should only be called externally, because it does not
// invoke callbacks, and removes the index from the list.
close() {
this.onopen = null;
this.onerror = null;
this.onclose = null;
this.onmessage = null;
this._close();
delete ProxyLink.links[this.index];
}
static get(index) {
return ProxyLink.links[index];
}
}
self.ProxyLink = ProxyLink;
// minify can't handle inline static members, so declare it here.
self.ProxyLink.links = [null]; // 0 considered an invalid index