Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vibe.d] WebSocket mess with "cast(shared) ..." #2740

Open
madkote opened this issue Aug 9, 2023 · 1 comment
Open

[vibe.d] WebSocket mess with "cast(shared) ..." #2740

madkote opened this issue Aug 9, 2023 · 1 comment

Comments

@madkote
Copy link

madkote commented Aug 9, 2023

Hi,
the idea is to perform some heavy calculation in thread and send results directly through WS connection back to client.

Note, connection close is handled - so ws connection is open during worker thread is executing.

Again, maybe I do something wrong here? I guess, there is some mess with references to websocket object.

On the first run (client connects to server), there are only one or two client, which do not receive all results. On second attempt to connect to server, clients NEVER receive any results from worker threads (which are created as new on each connection).

/*
 * Case 01
 * -------
 * Worker task is executed always in same worker thread.
 *
 * How-To:
 * - open 4 parallel WS connection to service.
 * - in each connection send channel id, eg. "ch01", "ch02", ...
 * - observe logs
 * - worker thread name is always same
 * - worker cannot write to WS?
 */

import vibe.d;
import vibe.vibe;
import vibe.core.core;
import vibe.http.server;
import vibe.http.websockets;
import vibe.http.router;
import vibe.inet.url;

import core.time;
import core.thread : Thread;
import std.conv;

static void workerFuncPingPongWS(Task caller, string channel_id, shared WebSocket s) nothrow {
	WebSocket ws = cast(WebSocket) s;
	int counter = 5;
	try {
		logInfo("WORKER :: thread-id=%s caller=%s channel-id=%s THREAD=%s", thisTid, caller, channel_id, Thread.getThis().name);
		while (receiveOnly!string() == "ping" && --counter) {
			logInfo("%s :: %s :: pong=%s", Thread.getThis().name, channel_id, counter);
			try {
				ws.send("pong-" ~ channel_id ~ "-" ~ Thread.getThis().name);
			} catch (Exception o) {
				logError(">>> exception=%s", o);
			}
			caller.send("pong");
			sleep(2.seconds);
		}
		caller.send("goodbye");
	} catch (Exception e) assert(false, e.msg);
}

class WebsocketService {
	@path("/ws") void getWebsocket1(scope WebSocket ws){
		logInfo("X> connected=%s, ws=%s code=%s THREAD=%s", ws.connected, &ws, ws.closeCode, Thread.getThis().name);
		
		auto channel_id = ws.receiveText;
		logInfo("Receive channel '%s'.", channel_id);

		auto callee = runWorkerTaskH(&workerFuncPingPongWS, Task.getThis, channel_id, cast(shared) ws);
		do {
			logInfo("ping");
			callee.send("ping");
		} while (receiveOnly!string() == "pong");
		
		while (true) {
			auto txt = ws.receiveText;
			logInfo("Receive '%s'. thisTid=%s", txt, thisTid);
			
			if (txt == "stop") {
				break;
			}
			ws.send(txt ~ " pong");
		}
		logInfo("Client disconnected - worker is done. THREAD=%s", Thread.getThis().name);
	}
}

void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{	
    res.writeBody("Hello");
}

void main()
{
	logInfo("APP::CASE::01");
	auto router = new URLRouter;
	router.registerWebInterface(new WebsocketService());
	router.get("/hello", &helloWorld);

	auto settings = new HTTPServerSettings;
	settings.port = 8080;
	settings.bindAddresses = ["::1", "127.0.0.1"];

	auto listener = listenHTTP(settings, router);
	scope (exit)
	{
		listener.stopListening();
	}

	runApplication();
}

server console output

[main(----) INF] APP::CASE::01
[main(----) INF] Listening for requests on http://[::1]:8080/
[main(----) INF] Listening for requests on http://127.0.0.1:8080/
[main(E7UW) INF] X> connected=true, ws=7F34D6BA1DD8 code=0 THREAD=main
[main(0Bjv) INF] X> connected=true, ws=7F34D8BA3DD8 code=0 THREAD=main
[main(0MxP) INF] X> connected=true, ws=7F34D7BA2DD8 code=0 THREAD=main
[main(bsKy) INF] X> connected=true, ws=7F34D5BA0DD8 code=0 THREAD=main

[main(E7UW) INF] Receive channel '2-fibonacci'.
[main(0Bjv) INF] Receive channel '0-fibonacci'.
[main(0MxP) INF] Receive channel '3-fibonacci'.
[main(bsKy) INF] Receive channel '1-fibonacci'.

[vibe-15(biLB) INF] WORKER :: thread-id=Tid(7f34db0876e0) caller=7F34DB074C00:1 channel-id=2-fibonacci THREAD=vibe-15
[vibe-4(tmZQ) INF] WORKER :: thread-id=Tid(7f34db087840) caller=7F34DB074A00:1 channel-id=3-fibonacci THREAD=vibe-4
[vibe-15(AXdw) INF] WORKER :: thread-id=Tid(7f34db087790) caller=7F34DB074800:3 channel-id=0-fibonacci THREAD=vibe-15
[vibe-15(DCEi) INF] WORKER :: thread-id=Tid(7f34db0878f0) caller=7F34DB074E00:1 channel-id=1-fibonacci THREAD=vibe-15

>>> exception=object.Exception@../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/net.d(819): Error writing data to socket.
----------------
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/net.d:819 @safe ulong vibe.core.net.TCPConnection.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0d143fca]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfaceproxy.d-mixin-302:302 @safe ulong vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.Stream).InterfaceProxy.ProxyImpl!(vibe.core.net.TCPConnection).ProxyImpl.__mixin8.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3.__mixin2.write(scope void[], scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0cf30395]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfaceproxy.d-mixin-196:196 @safe ulong vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.OutputStream).InterfaceProxy.__mixin22.__mixin3.__mixin2.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0cfcf442]
../../.dub/packages/vibe-d/0.9.6/vibe-d/stream/vibe/stream/wrapper.d:199 @safe ulong vibe.stream.wrapper.ConnectionProxyStream.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0d12ce3b]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/stream.d:305 @safe void vibe.core.stream.OutputStream.write(scope const(ubyte)[]) [0x55db0d14d69e]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:959 @safe void vibe.http.websockets.OutgoingWebSocketMessage.sendFrame(bool) [0x55db0d03a211]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:942 @safe void vibe.http.websockets.OutgoingWebSocketMessage.finalize() [0x55db0d03a044]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:688 @safe void vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) @safe, vibe.http.websockets.FrameOpcode).__lambda3() [0x55db0d03b44c]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/sync.d:189 @safe void vibe.core.sync.performLocked!(vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) @safe, vibe.http.websockets.FrameOpcode).__lambda3(), vibe.core.sync.InterruptibleTaskMutex).performLocked(vibe.core.sync.InterruptibleTaskMutex) [0x55db0d03b361]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:685 @safe void vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) @safe, vibe.http.websockets.FrameOpcode) [0x55db0d038fc3]
../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:657 @safe void vibe.http.websockets.WebSocket.send(scope const(char)[]) [0x55db0d038ef3]
source/app_forum_01_task_thread.d:33 nothrow void app_forum_01_task_thread.workerFuncPingPongWS(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) [0x55db0cf1b209]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/taskpool.d:211 nothrow void vibe.core.taskpool.TaskPool.doRunTaskH!(void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)).doRunTaskH(vibe.core.task.TaskSettings, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref immutable(char)[], ref shared(vibe.http.websockets.WebSocket)).taskFun(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) [0x55db0cee96d3]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:737 nothrow void vibe.core.task.TaskFuncInfo.set!(void function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)).set(ref void function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, ref void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref immutable(char)[], ref shared(vibe.http.websockets.WebSocket)).callDelegate(ref vibe.core.task.TaskFuncInfo) [0x55db0ceec2f6]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:758 void vibe.core.task.TaskFuncInfo.call() [0x55db0d15edc5]
../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:457 nothrow void vibe.core.task.TaskFiber.run() [0x55db0d15dfbe]
??:? void core.thread.context.Callable.opCall() [0x55db0d1e3de8]
??:? fiber_entryPoint [0x55db0d23fe1b]

...

client console output. As you can see channel 0-fibonacci did not received results from worker thread!

...

3-fibonacci results:
['pong-3-fibonacci-vibe-4',
 'pong-3-fibonacci-vibe-4',
 'pong-3-fibonacci-vibe-4',
 'pong-3-fibonacci-vibe-4',
 'fibonacci pong',
 '0 pong',
 '1 pong',
 '2 pong',
 '3 pong',
 '4 pong']


2-fibonacci results:
['pong-2-fibonacci-vibe-15',
 'pong-2-fibonacci-vibe-15',
 'pong-2-fibonacci-vibe-15',
 'pong-2-fibonacci-vibe-15',
 'fibonacci pong',
 '0 pong',
 '1 pong',
 '2 pong',
 '3 pong',
 '4 pong']

0-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong']

1-fibonacci results:
['pong-1-fibonacci-vibe-15',
 'pong-1-fibonacci-vibe-15',
 'pong-1-fibonacci-vibe-15',
 'pong-1-fibonacci-vibe-15',
 'fibonacci pong',
 '0 pong',
 '1 pong',
 '2 pong',
 '3 pong',
 '4 pong']

client console output on 2nd attempt (server worker thread has only errors on sending data from the thread). Here none of clients received any results from worker threads.

1-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong']

3-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong']

2-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong']

0-fibonacci results:
['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong']
@s-ludwig
Copy link
Member

The cast(shared)ws is most likely the problem here. Event based objects generally are always tied to their original thread - in this case the underlying TCP socket over which the WebSocket communicates. Using such an object generally leads to undefined behavior and, depending on the event driver/OS, might actually appear to work so some extent.

So to fix this, the worker task/thread should only perform the actual computation and should then report the result back to the original task. Likewise, if any intermediate communication is required, that should be channeled to the web socket owner task using something like message passing or vibe.core.channel and should then be sent from there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants