Skip to content

Commit

Permalink
Forward cancellation to connecting socket reads
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Apr 19, 2024
1 parent 5bd74a0 commit 52e08c0
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions src/Internal/Windows/SocketConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
use Amp\ByteStream\ReadableResourceStream;
use Amp\ByteStream\WritableResourceStream;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\Internal\ProcessStreams;
use Amp\Process\ProcessException;
use Amp\TimeoutCancellation;
use Revolt\EventLoop;
use function Amp\async;

Expand Down Expand Up @@ -70,7 +72,7 @@ public function connectPipes(WindowsHandle $handle, Cancellation $cancellation):
$handle->startBarrier->await($cancellation);

$controlPipe = new ReadableResourceStream($handle->sockets[0]);
$handle->pid = $this->readChildPid($controlPipe);
$handle->pid = $this->readChildPid($controlPipe, $cancellation);
} catch (\Throwable $exception) {
foreach ($handle->sockets as $socket) {
\fclose($socket);
Expand All @@ -97,9 +99,9 @@ public function connectPipes(WindowsHandle $handle, Cancellation $cancellation):
$handle->exitCodeStream = $controlPipe;

$stdin = \WeakReference::create($streams->stdin);
async(function () use ($handle, $stdin) {
async(function () use ($handle, $stdin, $cancellation): void {
try {
$exitCode = $this->readExitCode($handle->exitCodeStream);
$exitCode = $this->readExitCode($handle->exitCodeStream, $cancellation);

$handle->joinDeferred->complete($exitCode);
} catch (\Throwable) {
Expand Down Expand Up @@ -128,11 +130,11 @@ private function acceptClient(): void
throw new \Error("Failed to set client socket to non-blocking mode");
}

async(function () use ($socket) {
async(function () use ($socket): void {
try {
$handle = $this->performClientHandshake($socket);
$handle = $this->performClientHandshake($socket, new TimeoutCancellation(5));
$handle->startBarrier->arrive();
} catch (HandshakeException $e) {
} catch (HandshakeException|CancelledException $e) {
/** @psalm-suppress InvalidScalarArgument */
\fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($e->getCode()));
\fclose($socket);
Expand All @@ -145,13 +147,13 @@ private function acceptClient(): void
*
* @throws HandshakeException
*/
public function performClientHandshake($socket): WindowsHandle
public function performClientHandshake($socket, Cancellation $cancellation): WindowsHandle
{
$stream = new ReadableResourceStream($socket);

$packet = \unpack(
'Csignal/Npid/Cstream_id/a*client_token',
$this->read($stream, self::SECURITY_TOKEN_SIZE + 6)
$this->read($stream, $cancellation, length: self::SECURITY_TOKEN_SIZE + 6)
);

// validate the client's handshake
Expand Down Expand Up @@ -193,7 +195,7 @@ public function performClientHandshake($socket): WindowsHandle
throw new HandshakeException(HandshakeStatus::NO_LONGER_PENDING);
}

$packet = \unpack('Csignal/Cstatus', $this->read($stream, 2));
$packet = \unpack('Csignal/Cstatus', $this->read($stream, $cancellation, length: 2));

if ($packet['signal'] !== SignalCode::HANDSHAKE_ACK || $packet['status'] !== HandshakeStatus::SUCCESS) {
throw new HandshakeException(HandshakeStatus::ACK_STATUS_ERROR);
Expand All @@ -207,9 +209,9 @@ public function performClientHandshake($socket): WindowsHandle
/**
* @return positive-int
*/
private function readChildPid(ReadableResourceStream $stream): int
private function readChildPid(ReadableResourceStream $stream, Cancellation $cancellation): int
{
$packet = \unpack('Csignal/Npid', $this->read($stream, 5));
$packet = \unpack('Csignal/Npid', $this->read($stream, $cancellation, length: 5));
if ($packet['signal'] !== SignalCode::CHILD_PID) {
throw new HandshakeException(HandshakeStatus::SIGNAL_UNEXPECTED);
}
Expand All @@ -219,9 +221,9 @@ private function readChildPid(ReadableResourceStream $stream): int
return $pid;
}

private function readExitCode(ReadableResourceStream $stream): int
private function readExitCode(ReadableResourceStream $stream, Cancellation $cancellation): int
{
$packet = \unpack('Csignal/Ncode', $this->read($stream, 5));
$packet = \unpack('Csignal/Ncode', $this->read($stream, $cancellation, length: 5));

if ($packet['signal'] !== SignalCode::EXIT_CODE) {
throw new HandshakeException(HandshakeStatus::SIGNAL_UNEXPECTED);
Expand All @@ -230,15 +232,15 @@ private function readExitCode(ReadableResourceStream $stream): int
return (int) $packet['code'];
}

private function read(ReadableResourceStream $stream, int $length): string
private function read(ReadableResourceStream $stream, Cancellation $cancellation, int $length): string
{
$buffer = '';

do {
$remaining = $length - \strlen($buffer);
\assert($remaining > 0);

$chunk = $stream->read(limit: $remaining);
$chunk = $stream->read($cancellation, limit: $remaining);
if ($chunk === null) {
break;
}
Expand Down

0 comments on commit 52e08c0

Please sign in to comment.