Skip to content
This repository has been archived by the owner on Dec 18, 2023. It is now read-only.

Full duplex Ws transfer using channels #110

Open
wants to merge 89 commits into
base: develop
Choose a base branch
from

Conversation

ProphetLamb
Copy link
Collaborator

@ProphetLamb ProphetLamb commented Nov 1, 2022

Creates one channel for receiving (tx - server transfer). The channel has a associated producer and consumer.

Significant members

Class Purpose
WsRx Sends a stream over the ClientWebSocket
WsTxConsumer Pulls the tx channel and invokes registered handlers once a response has arrived. This supports multiple events, such as notification/live queries. The consumer uses a cache that evicts dead handles.
WsTxProvider Listens to blocks from ClientWebSocket and pumps the tx channel with messages. Messages can consist of multiple blocks, and are streamed. They are pumped as soon as the first part arrives. WsMessageReader handles multipart messages.

To-Do

  • Fix whatever causes messages to fall into the abyss.
  • Fix remaining failing testcases
  • WsMessageReader use a custom channel implementation, that is unbound, disposable using a pooled array.
  • Validate whether locks in WsMessageReader can be replaced with memory barriers of some kind.
  • Validate whether there are avoidable buffer dups.
  • Review, if the default settings for the RecyclableMemoryManagerManager is too aggressive.

Postscriptum: I kind of bricked my IDE by debugging tests, so I am clearly doing sth wrong kek
Postpostscriptum: Still in development, sometimes a message gets lost...
Postpostpostscriptum: thats what I get for cheating the initial implementation...

instead of (RspHeader Response, NtyHeader Notify, int Offset)  tuple
clips the length of a span the the maximum length
Listens for Messages and dispatches them by their headers to different handlers.
Remove useless DisposeAsync
Remove useless DisposeAsync
@ProphetLamb
Copy link
Collaborator Author

ProphetLamb commented Nov 5, 2022

Debug logging shows, that sometimes the Inflater repeats the last block indefinitely, until the channel is at capacity. Then it waits for the deflater indefinitely, because it is already disposed.


10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 65, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Finished receiving the message from the socket (E:MessageReceiveFinishedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Pushed the message to the channel (E:MessagePushedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 65, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Finished receiving the message from the socket (E:MessageReceiveFinishedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Pushed the message to the channel (E:MessagePushedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 62, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:1611

At some point the debug log stops, the line is not written to the end. The log is disposed by the DbHandle, therefore we can assume that the DbHandle was disposed. This is not the case, because the database is not killed. The remaining option is that the xunit thread is softlocked at this point.... Which would explain that the test does not terminate. Interestingly enough, this occurs during the flush of the FileStream, this is highly unlikely. All in all, very wired behaviour.

I observed, that the softlock only ever occurs at the last block of the last message in the unit test.

Note that the code is followed by a ct.ThrowIfCancellationRequested(); suggesting, that the cancellation is never requested, the on top of that, the client is never disposed, otherwise we expect a log message. This supports the scenario, that the main thread is softlocked.

// log that we are waiting for the socket
log.SocketWaiting();
// receive the first part
var result = await _socket.ReceiveAsync(buffer, ct).Inv();
// log that we have received a message from the socket
log.SockedReceived(result);

ct.ThrowIfCancellationRequested();

Why the ClientWebSocket repeats the last message continuously is unknown, it might be the case that the WebSocketDeflater interacts weirdly with the network stream... I do not know.

@ProphetLamb
Copy link
Collaborator Author

For now I reorder the ManagedWebSocket.Dispose after the WsReceiver*.Dispose

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant