Skip to content

Commit

Permalink
Merge pull request #43 from Danconnolly/handle-in-progress
Browse files Browse the repository at this point in the history
Concurrency fixes and cleanup in NetworkController
  • Loading branch information
Danconnolly committed Apr 30, 2023
2 parents dbf2f28 + 6abf931 commit 18a307d
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 222 deletions.
187 changes: 86 additions & 101 deletions net/src/main/java/io/bitcoinsv/bsvcl/net/network/NetworkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ public boolean hasExpired(long refTimestamp) {

// The EventBus used for event handling
private final EventBus eventBus;
// An executor Service, to trigger jobs in MultiThread...
ExecutorService jobExecutor = ThreadUtils.getCachedThreadExecutorService("JclNetworkHandler");

// active connections
private final Map<PeerAddress, SelectionKey> activeConns = new ConcurrentHashMap<>();
Expand All @@ -113,24 +111,24 @@ public NetworkController(String id, RuntimeConfig runtimeConfig, P2PConfig netCo
*/
public void openConnection(PeerAddress peerAddress) throws InterruptedException {
if (serviceState.isStarting() || serviceState.isCreated()) { startLatch.await(); }
handleConnectionToOpen(peerAddress);
}

public void closeConnection(PeerAddress peerAddress) {
closeConnection(peerAddress, PeerDisconnectedEvent.DisconnectedReason.DISCONNECTED_BY_LOCAL);
}

/** close a connection to the peer */
public void closeConnection(PeerAddress peerAddress, PeerDisconnectedEvent.DisconnectedReason reason) {
try {
lock.writeLock().lock();
logger.debug("{} : {} : Closing connection...", this.id, peerAddress);
SelectionKey key = activeConns.get(peerAddress);
if (key != null) {
closeKey(key, reason);
}
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
SocketAddress socketAddress = new InetSocketAddress(peerAddress.getIp(), peerAddress.getPort());
socketChannel.connect(socketAddress);

// potential for race condition here: as soon as we add it to inProgressConns it may get handled
// but it has not been registered with mainSelector yet - so we need the writeLock
inProgressConns.put(peerAddress, new InProgressConn(peerAddress));
// note: we ignore the returned key - if the connection attempt is successful, then the
// key is added to activeConns by handleConnect()
socketChannel.register(mainSelector, SelectionKey.OP_CONNECT, new KeyConnectionAttach(peerAddress));
numConnsTried++;
logger.debug("{} : {} : Connecting...", this.id, peerAddress);
} catch (Exception e) {
logger.error("{} : {} : Error closing connection: {}", this.id, peerAddress, e.getMessage());
e.printStackTrace();
processConnectionFailed(peerAddress, PeerRejectedEvent.RejectedReason.INTERNAL_ERROR, e.getMessage());
} finally {
lock.writeLock().unlock();
}
Expand All @@ -147,9 +145,8 @@ public void acceptConnection(PeerAddress peerAddress, SocketChannel channel) {
logger.debug("{} : {} : Accepting connection...", this.id, peerAddress);
inProgressConns.put(peerAddress, new InProgressConn(peerAddress));

SelectionKey key = channel.register(mainSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
key.attach(new KeyConnectionAttach(peerAddress));

SelectionKey key = channel.register(mainSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE,
new KeyConnectionAttach(peerAddress));
logger.trace("{} : {} : Connected, establishing connection...", this.id, peerAddress);
startPeerConnection(key);
} catch (Exception e) {
Expand All @@ -160,6 +157,26 @@ public void acceptConnection(PeerAddress peerAddress, SocketChannel channel) {
}
}

public void closeConnection(PeerAddress peerAddress) {
closeConnection(peerAddress, PeerDisconnectedEvent.DisconnectedReason.DISCONNECTED_BY_LOCAL);
}

/** close a connection to the peer */
public void closeConnection(PeerAddress peerAddress, PeerDisconnectedEvent.DisconnectedReason reason) {
try {
lock.writeLock().lock();
logger.debug("{} : {} : Closing connection...", this.id, peerAddress);
SelectionKey key = activeConns.get(peerAddress);
if (key != null) {
closeKey(key, reason);
}
} catch (Exception e) {
logger.error("{} : {} : Error closing connection: {}", this.id, peerAddress, e.getMessage());
} finally {
lock.writeLock().unlock();
}
}

public NetworkControllerState getStatus() {
NetworkControllerState result;
try {
Expand All @@ -186,21 +203,20 @@ public void run() {
logger.info("{} : Starting...", this.id);
try {
mainSelector = SelectorProvider.provider().openSelector();
startConnectionsJobs();
serviceState = ServiceState.RUNNING;
startLatch.countDown();
eventBus.publish(new NetStartEvent(this.peerAddress));

while (serviceState.isRunning() || serviceState.isPaused()) {
handleSelectorKeys(mainSelector);
handleInProgressConnections();
handleSelectorKeys(mainSelector); // this call has a blocking wait
}
} catch (Throwable e) {
logger.error("{} : Error running the NetworkController", this.id, e);
e.printStackTrace();
} finally {
serviceState = ServiceState.STOPPING;
eventBus.publish(new NetStopEvent());
stopConnectionsJobs();
closeAllKeys(mainSelector);
}
serviceState = ServiceState.STOPPED;
Expand Down Expand Up @@ -232,16 +248,6 @@ public void awaitStopped() {
}
}

/** Processes the pending Connections in a separate Thread */
private void startConnectionsJobs() {
jobExecutor.submit(this::handleInProgressConnections);
}

/** Stops the processing of pending Connections (running in a separate Thread) */
private void stopConnectionsJobs() {
if (jobExecutor != null) jobExecutor.shutdown();
}

/**
* Logic to execute when a Connection to a Remote Peer has failed, so there is actually no connection at all.
* We just discard and blacklist this Peer.
Expand Down Expand Up @@ -318,73 +324,42 @@ private void startPeerConnection(SelectionKey key) {
}
}

/**
* It handles one of the "pendingToOpen" connections. It opens the connection and registers the Key in the
* selector. If the connection process takes longer than the limit workingState in the configuration, then we discard
* this Peer.
* @param peerAddress Peer to connect to
*/
private void handleConnectionToOpen(PeerAddress peerAddress) {
try {
if (serviceState != ServiceState.RUNNING && serviceState != ServiceState.STARTING) {
throw new RuntimeException("Attempted to open connection while NetworkController is not running");
}
startLatch.await();
lock.writeLock().lock();
numConnsTried++;
logger.debug("{} : {} : Connecting...", this.id, peerAddress);
inProgressConns.put(peerAddress, new InProgressConn(peerAddress));

SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
SocketAddress socketAddress = new InetSocketAddress(peerAddress.getIp(), peerAddress.getPort());
boolean isConnected = socketChannel.connect(socketAddress);

SelectionKey key = (isConnected)
? socketChannel.register(mainSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
: socketChannel.register(mainSelector, SelectionKey.OP_CONNECT);

key.attach(new KeyConnectionAttach(peerAddress));

if (isConnected) {
logger.trace("{} : {} : Connected, establishing connection...", this.id, peerAddress);
startPeerConnection(key);
} else {
logger.trace("{} : {} : Connected, waiting for remote confirmation...", this.id, peerAddress);
}
} catch (Exception e) {
e.printStackTrace();
processConnectionFailed(peerAddress, PeerRejectedEvent.RejectedReason.INTERNAL_ERROR, e.getMessage());
} finally {
lock.writeLock().unlock();
}
}

/**
* Handle the in-progress connections. These connections have been opened from our end, but we are just waiting
* for the remote Peer to confirm (through a CONNECT Key in the KeySelector). This method loops over all these
* connections and remove those that are expired based on our config (timeoutSocketRemoteConfirmation)
* NOTE: An expired and remove connection from there might still confirm later on, sending a CONNECT signal to us. In
* that case, the connection is still accepted and inserted into the "active" connections.
* todo: make sure that the P2P parent is notified of any disconnects.
* <p>
* We need to lock here to prevent a potential race condition with openConnection()
* <p>
* todo:
* * is this necessary? isnt there a capability built-in to the NIO package for this?
* * do we have a test for this case?
* * make sure that the P2P parent is notified of any disconnects
*/
private void handleInProgressConnections() {
// We keep a temporary list where we keep a reference to those In-Progress Connections that need to be removed
// because they have expired...
List<PeerAddress> inProgressConnsToRemove = new ArrayList<>();
// loop over the InProgress Connections...
for (PeerAddress peerAddress : this.inProgressConns.keySet()) {
NetworkController.InProgressConn inProgressConn = this.inProgressConns.get(peerAddress);
if (inProgressConn.hasExpired(this.config.getTimeoutSocketRemoteConfirmation())) {
inProgressConnsToRemove.add(peerAddress);
try {
lock.writeLock().lock();
// We keep a temporary list where we keep a reference to those In-Progress Connections that need to be removed
// because they have expired...
List<PeerAddress> inProgressConnsToRemove = new ArrayList<>();
// loop over the InProgress Connections...
for (PeerAddress peerAddress : this.inProgressConns.keySet()) {
NetworkController.InProgressConn inProgressConn = this.inProgressConns.get(peerAddress);
if (inProgressConn.hasExpired(this.config.getTimeoutSocketRemoteConfirmation())) {
inProgressConnsToRemove.add(peerAddress);
}
} // for...
// we remove the expired connections...
if (!inProgressConnsToRemove.isEmpty()) {
logger.trace("{} : Removing {} in-progress expired connections", this.id, inProgressConnsToRemove.size());
numConnsInProgressExpired.addAndGet(inProgressConnsToRemove.size());
inProgressConnsToRemove.forEach(inProgressConns::remove);
inProgressConnsToRemove.clear();
}
} // for...
// we remove the expired connections...
if (!inProgressConnsToRemove.isEmpty()) {
logger.trace("{} : Removing {} in-progress expired connections", this.id, inProgressConnsToRemove.size());
numConnsInProgressExpired.addAndGet(inProgressConnsToRemove.size());
inProgressConnsToRemove.forEach(inProgressConns::remove);
inProgressConnsToRemove.clear();
} finally {
lock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -446,6 +421,9 @@ private boolean checkDuplicatedConnection(SelectionKey key, PeerAddress peerAddr

/**
* It performs a loop to handle the Selection Keys.
* <p>
* This method is only called by the object thread. It will therefore not conflict with other methods
* that are also only called by the object thread.
*/
private void handleSelectorKeys(Selector selector) throws IOException {
selector.select(100); // we need a timeout, otherwise it will block forever...
Expand All @@ -462,6 +440,9 @@ private void handleSelectorKeys(Selector selector) throws IOException {
* overridden by a child class in case we don't want to handle specific keys or handle new ones (like the
* ACCEPT key, which is not handled here since the NetworkHandlerImpl does not accept incoming connections)
* @param key Key to handle
* <p>
* This method is only called by the object thread. It will therefore not conflict with other methods
* that are also only called by the object thread.
*/
private void handleKey(SelectionKey key) throws IOException {
logger.trace("Key : " + key);
Expand All @@ -487,6 +468,11 @@ private void handleKey(SelectionKey key) throws IOException {

/**
* It handles a CONNECT key, that is a confirmation that a connection to a remote Peer is successful.
* <p>
* We need a lock here to prevent a potential race condition with openConnection().
* <p>
* This method is only called by the object thread. It will therefore not conflict with other methods
* that are also only called by the object thread.
*/
private void handleConnect(SelectionKey key) throws IOException {
try {
Expand Down Expand Up @@ -533,6 +519,9 @@ private void handleConnect(SelectionKey key) throws IOException {
* It handles a READ key, that is some data is ready to read from one connection. Internally, the stream
* representing this connection is used to read data from the socket (and return it to the "client" by using
* the "onData" method)
* <p>
* This method is only called by the object thread. It will therefore not conflict with other methods
* that are also only called by the object thread.
*/
private void handleRead(SelectionKey key) throws IOException {
// Read the data from the Peer (through the Stream wrapped out around it) and run the callbacks:
Expand All @@ -547,20 +536,16 @@ private void handleRead(SelectionKey key) throws IOException {
/**
* It handles a WRITE key, that is writing some data to the socket implementing that connection. Each connections
* is representing by a ByteArrayStream, so we use it to write the data through the channel.
* <p>
* This method is only called by the object thread. It will therefore not conflict with other methods
* that are also only called by the object thread.
*/
private void handleWrite(SelectionKey key) throws IOException {
// Write the data to the Peer (through the Stream wrapped out around it) and run the callbacks:
KeyConnectionAttach keyConnection = (KeyConnectionAttach) key.attachment();
// the checks below should not be necessary. They should be removed and the code allowed to create
// exceptions. But if I do that right now then the majority of tests will fail.
// There's something very messed up with the startup process that needs to be sorted out. todo
if (keyConnection == null) {
logger.warn(">>>> NULL ATTACHMENT: {}", key);
return;
}
if (keyConnection.stream == null) {
logger.warn(">>>> NULL STREAM: {}", keyConnection.peerAddress);
return;
if (keyConnection == null || keyConnection.stream == null) {
logger.warn(">>>> key attachment or stream is null");
throw new RuntimeException("key attachment or stream is null");
}
int numBytesWrite = ((NIOOutputStream) keyConnection.stream.output()).writeToSocket();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ private void onPeerDisconnected(PeerDisconnectedEvent event) {
}
}

// Event Handler:
/** When the connection has been established at a network level, this handler initiates the application level
/* handshake process.
*/
private void onPeerMsgReady(PeerMsgReadyEvent event) {
try {
lock.lock();
Expand All @@ -158,6 +160,7 @@ private void onPeerMsgReady(PeerMsgReadyEvent event) {
// For some strange reasons, sometimes this event is triggered several times, so in order to
// prevent from starting the same handshake twice, we check if there is already a handshake in progress
// with this Peer...
// todo: investigate why this might happen
if (handlerInfo.get(peerAddress) == null) {
HandshakePeerInfo peerInfo = new HandshakePeerInfo(peerAddress);
handlerInfo.put(peerAddress, peerInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class UnannouncedBlockTest extends Specification {

/**
* We test that we can send an UNANNOUNCED "Big" block to BSVCL
* todo: test (or code) in unstable
*/
def "Testing Big Block"() {
given:
Expand Down
Loading

0 comments on commit 18a307d

Please sign in to comment.