Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket Accept/WriteFrame/WriteMessage not processing more than 2 writes #280

Open
akumaburn opened this issue Apr 27, 2023 · 9 comments

Comments

@akumaburn
Copy link

akumaburn commented Apr 27, 2023

So it seems if you try to write to a web-socket repeatedly only 2 messages are actually being transmitted. With the rest being lost somehow.

Given an AsyncServlet mapped like so:

@Provides AsyncServlet servlet() {
        return RoutingServlet.create().mapWebSocket("/socket/stream", this::handleSocket);
}

And a handleSocket method like so:

private void handleSocket(WebSocket socket) {
    // Do some reading..
    // ..
    // Do some writing...
   for(int i = 0; i < 10; i ++) {
            socket.writeMessage(WebSocket.Message.text("some message: " + i + " "));
   }
}

I've confirmed with Wireshark on Linux that only 2 packets with the frames/messages are actually being transmitted, and everything else seems to be completely lost. I've tried sending frames instead of messages and correctly marking the last frame and the result is still the same. I've also tried with the messageWriteChannel.accept and still no luck.

The result of the above is similar to:

some message: 0 some message: 9

When something more like this was expected:

some message: 0 some message: 1 some message: 2 some message: 3 some message: 4 some message: 5 some message: 6 some message: 7 some message: 8 some message: 9

@eduard-vasinskyi
Copy link
Contributor

Hi, @akumaburn

ActiveJ WebSockets are asynchronous. Its methods like readMessage/writeMessage return promises, which means that execution occurs not right away but some time later.

Javadoc of these methods says: All writeXXX methods should be called serially. This means that when you write messages to WebSocket you need to wait for the previous write-promise to complete before sending the next message. There are several useful helper methods in the Promises class that allow writing asynchronous loops. You can rewrite your for-loop using Promises#loop, for example:

Promises.loop(
     0,
     i -> i < 10,
     i -> webSocket.writeMessage(Message.text("some message:" + i + " "))
           .map($ -> i + 1)
);

@akumaburn
Copy link
Author

akumaburn commented Apr 28, 2023

@eduard-vasinskyi How do you explicitly wait for a promise to complete before sending the next one, i tried your example and its working but I still haven't wrapped my head around this, for example this doesn't work:

        ArrayList<Promise<Void>> promiseList = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            WebSocket.Message
                    currentMsg =
                    WebSocket.Message.text(System.currentTimeMillis() + " - some message: " + i);
            Promise<Void> currentPromise = Router.socket.writeMessage(currentMsg);
            promiseList.add(currentPromise);
        }

        return Promises.all(promiseList);

I'm trying to understand how to do the generic case of write, writenext, writenext.. instead of depending on a specific helper method.

EDIT: I was able to get it to work like this, but would like confirmation that this is the correct way:

        Promise<Void> currentPromise = null;

        for (int i = 0; i < 10; i++) {
            WebSocket.Message
                    currentMsg =
                    WebSocket.Message.text(System.currentTimeMillis() + " - some message: " + i);
            if(currentPromise == null) {
                currentPromise = Router.socket.writeMessage(currentMsg);
            } else {
                currentPromise = currentPromise.then($ -> Router.socket.writeMessage(currentMsg));
            }
        }

        return currentPromise;

@akumaburn
Copy link
Author

@eduard-vasinskyi Essentially I'm trying to understand how to stream a response to web socket, I want to send each piece on its own, but it seems the current setup will only send them all at once. Does this mean I can't use a websocket and must return to polling?

@eduard-vasinskyi
Copy link
Contributor

@akumaburn

ActiveJ uses a reactive approach similar to the one used in Java’s CompletableFuture and JavaScript promises.

Trying to implement looping logic manually in asynchronous context is often error-prone. We recommend using methods from the Promises class: loop(), until(), etc.

Alternatively, you can use WebSocket CSP API. There are methods like WebSocket#frameReadChannel, WebSocket#messageReadChannel, WebSocket#frameWriteChannel, WebSocket#messageWriteChannel.

These methods return channel suppliers and consumers that are sequential by design. For your use-case you can obtain message write channel (ChannelConsumer<Message>) from the WebSocket and then stream your messages to the channel consumer:

return ChannelSuppliers.ofStream(IntStream.range(0, 10)
            .mapToObj(i -> Message.text(System.currentTimeMillis() + " - some message: " + i)))
      .streamTo(webSocket.messageWriteChannel());

I did not get the part about “the current setup will only send them all at once”. Are you referring to web socket messages? Do you mean that web socket messages are sent as a single network packet or something else?

@akumaburn
Copy link
Author

akumaburn commented May 2, 2023

@eduard-vasinskyi I meant the processing of promises on the socket writes seems to happen all at once except for the very first promise, for example:

EG: I expect the ASYNC processing to be like this for a 5 second total generation time

1, 0.001s
2, 0.002s
3, 0.003s
4, 1.003s
5, 4.003s

Where 1,2,3 would be received virtually right away, and then 4 would be received a second later, and then 5 would be received when the full time has elapsed.

Whereas the current behavior seems to be more like:

1, 0.001s
2, 4.001s
3, 4.002s
4, 4.003s
5, 4.004s

Where 1 would be received virtually right away, and then 2,3,4,5 would be received when the full time has elapsed. Despite 2,3,4 not taking as long to execute as 5.

The reason I don't want to use the built loop/until functionality is because there is quite a bit more complex logic than I'm indicating above.

@akumaburn
Copy link
Author

akumaburn commented May 5, 2023

@eduard-vasinskyi

I've not been able to find a solution, I did try to work around this problem by using AsyncExecutors but it seems that they hang when trying to write to the existing WebSocket object, is it possible to access the WebSocket from outside the current reactor thread?

This is my current handler class - please let me know what I'm doing wrong:


import io.activej.async.process.AsyncExecutor;
import io.activej.async.process.AsyncExecutors;
import io.activej.http.IWebSocket;
import io.activej.promise.Promise;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public class WebSocketMessageHandler {
    private final IWebSocket                webSocket;
    private final Queue<IWebSocket.Message> messageQueue    = new LinkedList<>();
    private final AtomicBoolean             isSending       = new AtomicBoolean(false);
    private final ExecutorService           executorService = Executors.newSingleThreadExecutor();

    public WebSocketMessageHandler(IWebSocket webSocket) {
        this.webSocket = webSocket;
        startMessageSendingLoop();
    }

    public void enqueueMessage(IWebSocket.Message message) {
        synchronized (messageQueue) {
            messageQueue.add(message);
        }
    }

    private void startMessageSendingLoop() {
        executorService.submit(() -> {
            while (true) {
                IWebSocket.Message message;

                synchronized (messageQueue) {
                    message = messageQueue.poll();
                }

                if (message != null) {
                    AsyncExecutor asyncExecutor = AsyncExecutors.buffered(1);

                    asyncExecutor.execute(() -> {
                        if (isSending.compareAndSet(false, true)) {
                            return webSocket.writeMessage(message)
                                            .whenComplete(($, e) -> {
                                                if (e != null) {
                                                    e.printStackTrace();
                                                }
                                                isSending.set(false);
                                            });
                        }
                        return Promise.complete();
                    }).getResult();
                } else {
                    try {
                        Thread.sleep(100); // Adjust the sleep duration as needed
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

cc @dvolvach

@eduard-vasinskyi
Copy link
Contributor

@akumaburn
Can you provide a code and explain how you measured that timings? I will try to investigate where the problem is.

As for your workaround, you cannot use WebSocket outside of the reactor's thread. Your example probably hangs as IWebSocket#writeMessage throws an exception while checking whether it is called inside a reactor thread. An exception is lost as the result of ExecutionService#submit is ignored.

You can split your problem into two subtasks.

First is how to execute external code in relation to a reactor. You can do that by using Promise#ofBlocking methods that run blocking code in some Executor and return a result of execution as a Promise.

The next subtask is to write messages to a WebSocket. As I said earlier, you can use asynchronous looping methods like Promises#loop or Promises#until.

Alternatively, you can use Promise#sequence method to send a sequence of messages to the WebSocket:

public Promise<Void> writeMessages(List<Message> messages, IWebSocket webSocket) {
  return Promises.sequence(messages.stream()
        .map(message -> () -> webSocket.writeMessage(message)));
}

Another approach is to write your own asynchronously recursive method. Something like this:

public Promise<Void> writeMessages(List<Message> messages, IWebSocket webSocket) {
  return Promise.ofCallback(cb -> doWrite(messages.iterator(), webSocket, cb));
}


private void doWrite(Iterator<Message> iterator, IWebSocket webSocket, SettableCallback<Void> cb) {
  if (!iterator.hasNext()) {
     cb.set(null);
     return;
  }


  webSocket.writeMessage(iterator.next())
        .whenResult(() -> doWrite(iterator, webSocket, cb))
        .whenException(cb::setException);
}

All those approaches are common patterns in asynchronous reactive programming, using ObservableFutures/Promises/Callbacks alike.

@akumaburn
Copy link
Author

akumaburn commented May 8, 2023

@eduard-vasinskyi I measured the performance using the network tab of Firefox and looking at the timings on the web socket messages. You can try creating a simple web socket client in JavaScript and then pointing it to the websocket url provided to ActiveJ, on firefox's network response you should be able to see the messages as they come in along with their corresponding message timing:
image

I understand keeping with patterns, but the design denies access to a websocket unless we are currently in that websocket's Reactor thread, in the case where we need to do some off-thread processing this will prevent us from being able to write to the websocket??

To be clear these messages are being generated and we don't know their length, number or their timing. Just imagine we have an infinite amount of messages to write, how could we possibly use a list?

I'm still not very clear on how to run code in a specific reactor's thread, you mentioned Promise#ofBlocking but that takes an Executor as an input not an AsyncExecutor

Assume I'm setting my Reactor in a static variable called Router.reactor which is set by:

@Provides NioReactor reactor() {
    Router.reactor = Eventloop.builder().withFatalErrorHandler(rethrow()).build();
    return Router.reactor;
}

Lets keep it simple, show me how to just print "Hello Word", in Router.reactor's thread from some other executor.

@akumaburn
Copy link
Author

akumaburn commented May 9, 2023

This is taking too long to troubleshoot, for now I've switched to https://github.com/TooTallNate/Java-WebSocket and it is working as expected out of the box. I believe I'll stay with ActiveJ for REST but use the former for web sockets specifically at least until ActiveJ websockets mature.

I feel it is important to stress that the entire point of websockets is to avoid having the overhead associated with REST. In that end the expectation for a websocket is to be opened once, and not closed until the user ends their session. The websocket should persist and be able to be written to by any thread, and currently it does not perform as such in ActiveJ, so that is my reason for switching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants