Skip to content

Commit

Permalink
Update to latest websocket changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Sep 3, 2023
1 parent 24daf3c commit 090367c
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 48 deletions.
8 changes: 3 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,22 @@
"amphp/amp": "^3",
"amphp/byte-stream": "^2",
"amphp/http": "^2",
"amphp/http-client": "^5",
"amphp/http-client": "^5-dev",
"amphp/socket": "^2",
"amphp/websocket": "^2",
"amphp/websocket": "^2-dev",
"league/uri": "^6",
"psr/http-message": "^1",
"revolt/event-loop": "^1"
},
"require-dev": {
"amphp/http-server": "^3",
"amphp/websocket-server": "^3",
"amphp/websocket-server": "^3-dev",
"amphp/phpunit-util": "^3",
"amphp/php-cs-fixer-config": "^2",
"phpunit/phpunit": "^9",
"psr/log": "^1",
"psalm/phar": "^5.4"
},
"minimum-stability": "dev",
"prefer-stable": true,
"autoload": {
"psr-4": {
"Amp\\Websocket\\Client\\": "src"
Expand Down
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use Amp\Websocket\Client;
Amp\Loop::run(function () {
/** @var Client\WebsocketConnection $connection */
$connection = yield Client\connect('ws://demos.kaazing.com/echo');
yield $connection->send('Hello!');
yield $connection->sendText('Hello!');

$i = 0;

Expand All @@ -89,9 +89,9 @@ Amp\Loop::run(function () {
yield new Delayed(1000);

if ($i < 3) {
yield $connection->send('Ping: ' . ++$i);
yield $connection->sendText('Ping: ' . ++$i);
} else {
yield $connection->send('Goodbye!');
yield $connection->sendText('Goodbye!');
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions examples/amp.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

$connection = connect($handshake);

$connection->send('Hello!');
$connection->sendText('Hello!');

$i = 0;

Expand All @@ -30,8 +30,8 @@
delay(1);

if ($i < 3) {
$connection->send('Ping: ' . ++$i);
$connection->sendText('Ping: ' . ++$i);
} else {
$connection->send('Goodbye!');
$connection->sendText('Goodbye!');
}
}
42 changes: 27 additions & 15 deletions src/Rfc6455Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
use Amp\Http\Client\Response;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;
use Amp\Websocket\CloseCode;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\WebsocketClientMetadata;
use Amp\Websocket\WebsocketCloseCode;
use Amp\Websocket\WebsocketCount;
use Amp\Websocket\WebsocketMessage;
use Amp\Websocket\WebsocketTimestamp;
use Traversable;

final class Rfc6455Connection implements WebsocketConnection
final class Rfc6455Connection implements WebsocketConnection, \IteratorAggregate

Check failure on line 19 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

MissingTemplateParam

src/Rfc6455Connection.php:19:63: MissingTemplateParam: Amp\Websocket\Client\Rfc6455Connection has missing template params when extending IteratorAggregate, expecting 2 (see https://psalm.dev/182)

Check failure on line 19 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

MissingTemplateParam

src/Rfc6455Connection.php:19:63: MissingTemplateParam: Amp\Websocket\Client\Rfc6455Connection has missing template params when extending IteratorAggregate, expecting 2 (see https://psalm.dev/182)
{
use ForbidCloning;
use ForbidSerialization;
Expand Down Expand Up @@ -63,11 +65,6 @@ public function isClosedByPeer(): bool
return $this->client->isClosedByPeer();

Check failure on line 65 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

NullableReturnStatement

src/Rfc6455Connection.php:65:16: NullableReturnStatement: The declared return type 'bool' for Amp\Websocket\Client\Rfc6455Connection::isClosedByPeer is not nullable, but the function returns 'bool|null' (see https://psalm.dev/139)

Check failure on line 65 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

NullableReturnStatement

src/Rfc6455Connection.php:65:16: NullableReturnStatement: The declared return type 'bool' for Amp\Websocket\Client\Rfc6455Connection::isClosedByPeer is not nullable, but the function returns 'bool|null' (see https://psalm.dev/139)
}

public function getUnansweredPingCount(): int
{
return $this->client->getUnansweredPingCount();
}

public function getCloseCode(): int

Check failure on line 68 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

InvalidNullableReturnType

src/Rfc6455Connection.php:68:37: InvalidNullableReturnType: The declared return type 'int' for Amp\Websocket\Client\Rfc6455Connection::getCloseCode is not nullable, but 'int|null' contains null (see https://psalm.dev/144)

Check failure on line 68 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

InvalidNullableReturnType

src/Rfc6455Connection.php:68:37: InvalidNullableReturnType: The declared return type 'int' for Amp\Websocket\Client\Rfc6455Connection::getCloseCode is not nullable, but 'int|null' contains null (see https://psalm.dev/144)
{
return $this->client->getCloseCode();

Check failure on line 70 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

NullableReturnStatement

src/Rfc6455Connection.php:70:16: NullableReturnStatement: The declared return type 'int' for Amp\Websocket\Client\Rfc6455Connection::getCloseCode is not nullable, but the function returns 'int|null' (see https://psalm.dev/139)

Check failure on line 70 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

NullableReturnStatement

src/Rfc6455Connection.php:70:16: NullableReturnStatement: The declared return type 'int' for Amp\Websocket\Client\Rfc6455Connection::getCloseCode is not nullable, but the function returns 'int|null' (see https://psalm.dev/139)
Expand All @@ -78,19 +75,19 @@ public function getCloseReason(): string
return $this->client->getCloseReason();

Check failure on line 75 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

NullableReturnStatement

src/Rfc6455Connection.php:75:16: NullableReturnStatement: The declared return type 'string' for Amp\Websocket\Client\Rfc6455Connection::getCloseReason is not nullable, but the function returns 'null|string' (see https://psalm.dev/139)

Check failure on line 75 in src/Rfc6455Connection.php

View workflow job for this annotation

GitHub Actions / PHP 8.2

NullableReturnStatement

src/Rfc6455Connection.php:75:16: NullableReturnStatement: The declared return type 'string' for Amp\Websocket\Client\Rfc6455Connection::getCloseReason is not nullable, but the function returns 'null|string' (see https://psalm.dev/139)
}

public function send(string $data): void
public function sendText(string $data): void
{
$this->client->send($data);
$this->client->sendText($data);
}

public function sendBinary(string $data): void
{
$this->client->sendBinary($data);
}

public function stream(ReadableStream $stream): void
public function streamText(ReadableStream $stream): void
{
$this->client->stream($stream);
$this->client->streamText($stream);
}

public function streamBinary(ReadableStream $stream): void
Expand All @@ -103,17 +100,22 @@ public function ping(): void
$this->client->ping();
}

public function getInfo(): WebsocketClientMetadata
public function getCount(WebsocketCount $type): int
{
return $this->client->getCount($type);
}

public function getTimestamp(WebsocketTimestamp $type): int
{
return $this->client->getInfo();
return $this->client->getTimestamp($type);
}

public function isClosed(): bool
{
return $this->client->isClosed();
}

public function close(int $code = CloseCode::NORMAL_CLOSE, string $reason = ''): void
public function close(int $code = WebsocketCloseCode::NORMAL_CLOSE, string $reason = ''): void
{
$this->client->close($code, $reason);
}
Expand All @@ -122,4 +124,14 @@ public function onClose(\Closure $onClose): void
{
$this->client->onClose($onClose);
}

public function isCompressionEnabled(): bool
{
return $this->client->isCompressionEnabled();
}

public function getIterator(): Traversable
{
yield from $this->client;
}
}
14 changes: 7 additions & 7 deletions src/Rfc6455ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
use Amp\ForbidSerialization;
use Amp\Http\Client\Response;
use Amp\Socket\Socket;
use Amp\Websocket\Compression\CompressionContext;
use Amp\Websocket\HeartbeatQueue;
use Amp\Websocket\Compression\WebsocketCompressionContext;
use Amp\Websocket\Parser\Rfc6455ParserFactory;
use Amp\Websocket\Parser\WebsocketParserFactory;
use Amp\Websocket\RateLimiter;
use Amp\Websocket\Rfc6455Client;
use Amp\Websocket\WebsocketHeartbeatQueue;
use Amp\Websocket\WebsocketRateLimit;

final class Rfc6455ConnectionFactory implements WebsocketConnectionFactory
{
use ForbidCloning;
use ForbidSerialization;

public function __construct(
private readonly ?HeartbeatQueue $heartbeatQueue = null,
private readonly ?RateLimiter $rateLimiter = null,
private readonly ?WebsocketHeartbeatQueue $heartbeatQueue = null,
private readonly ?WebsocketRateLimit $rateLimit = null,
private readonly WebsocketParserFactory $parserFactory = new Rfc6455ParserFactory(
messageSizeLimit: Rfc6455Connection::DEFAULT_MESSAGE_SIZE_LIMIT,
frameSizeLimit: Rfc6455Connection::DEFAULT_FRAME_SIZE_LIMIT,
Expand All @@ -33,15 +33,15 @@ public function __construct(
public function createConnection(
Response $handshakeResponse,
Socket $socket,
?CompressionContext $compressionContext = null,
?WebsocketCompressionContext $compressionContext = null,
): WebsocketConnection {
$client = new Rfc6455Client(
socket: $socket,
masked: true,
parserFactory: $this->parserFactory,
compressionContext: $compressionContext,
heartbeatQueue: $this->heartbeatQueue,
rateLimiter: $this->rateLimiter,
rateLimit: $this->rateLimit,
frameSplitThreshold: $this->frameSplitThreshold,
closePeriod: $this->closePeriod,
);
Expand Down
6 changes: 3 additions & 3 deletions src/Rfc6455Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
use Amp\Socket\ConnectContext;
use Amp\Socket\Socket;
use Amp\Websocket;
use Amp\Websocket\Compression\CompressionContextFactory;
use Amp\Websocket\Compression\Rfc7692CompressionFactory;
use Amp\Websocket\Compression\WebsocketCompressionContextFactory;

final class Rfc6455Connector implements WebsocketConnector
{
Expand All @@ -27,12 +27,12 @@ final class Rfc6455Connector implements WebsocketConnector
private readonly HttpClient $httpClient;

/**
* @param CompressionContextFactory|null $compressionContextFactory Use null to disable compression.
* @param WebsocketCompressionContextFactory|null $compressionContextFactory Use null to disable compression.
*/
public function __construct(
private readonly WebsocketConnectionFactory $connectionFactory = new Rfc6455ConnectionFactory(),
HttpClient $httpClient = null,
private readonly ?CompressionContextFactory $compressionContextFactory = new Rfc7692CompressionFactory(),
private readonly ?WebsocketCompressionContextFactory $compressionContextFactory = new Rfc7692CompressionFactory(),
) {
$this->httpClient = $httpClient
?? (new HttpClientBuilder)->usingPool(
Expand Down
6 changes: 3 additions & 3 deletions src/WebsocketConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

use Amp\Http\Client\Response;
use Amp\Socket\Socket;
use Amp\Websocket\Compression\CompressionContext;
use Amp\Websocket\Compression\WebsocketCompressionContext;

interface WebsocketConnectionFactory
{
/**
* @param Response $handshakeResponse Response that initiated the websocket connection.
* @param Socket $socket Underlying socket to be used for network communication.
* @param CompressionContext|null $compressionContext CompressionContext generated from the response headers.
* @param WebsocketCompressionContext|null $compressionContext CompressionContext generated from the response headers.
*/
public function createConnection(
Response $handshakeResponse,
Socket $socket,
?CompressionContext $compressionContext = null,
?WebsocketCompressionContext $compressionContext = null,
): WebsocketConnection;
}
6 changes: 3 additions & 3 deletions test-autobahn/runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use Amp\Websocket\Client\Rfc6455ConnectionFactory;
use Amp\Websocket\Client\Rfc6455Connector;
use Amp\Websocket\Client\WebsocketHandshake;
use Amp\Websocket\ClosedException;
use Amp\Websocket\Parser\Rfc6455ParserFactory;
use Amp\Websocket\WebsocketClosedException;

require __DIR__ . '/../vendor/autoload.php';

Expand Down Expand Up @@ -42,10 +42,10 @@
if ($message->isBinary()) {
$connection->sendBinary($content);
} else {
$connection->send($content);
$connection->sendText($content);
}
}
} catch (ClosedException $e) {
} catch (WebsocketClosedException $e) {
// ignore
} catch (AssertionError $e) {
print 'Assertion error: ' . $e->getMessage() . PHP_EOL;
Expand Down
12 changes: 6 additions & 6 deletions test/WebsocketConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
use Amp\Socket\SocketException;
use Amp\TimeoutCancellation;
use Amp\Websocket\Client;
use Amp\Websocket\ClosedException;
use Amp\Websocket\Parser\Rfc6455ParserFactory;
use Amp\Websocket\Server\EmptyWebsocketHandshakeHandler;
use Amp\Websocket\Server\Websocket;
use Amp\Websocket\Server\WebsocketClientHandler;
use Amp\Websocket\WebsocketClient;
use Amp\Websocket\WebsocketClosedException;
use Psr\Log\NullLogger;
use function Amp\async;
use function Amp\delay;
Expand Down Expand Up @@ -82,14 +82,14 @@ public function testSimpleTextEcho(): void
public function handleClient(WebsocketClient $client, Request $request, Response $response): void
{
while ($message = $client->receive()) {
$client->send($message->buffer());
$client->sendText($message->buffer());
}
}
});

try {
$client = connect('ws://' . $address->toString());
$client->send('Hey!');
$client->sendText('Hey!');

$message = $client->receive();

Expand All @@ -110,8 +110,8 @@ public function testUnconsumedMessage(): void
[$server, $address] = $this->createServer(new class implements WebsocketClientHandler {
public function handleClient(WebsocketClient $client, Request $request, Response $response): void
{
$client->send(\str_repeat('.', 1024 * 1024));
$client->send('Message');
$client->sendText(\str_repeat('.', 1024 * 1024));
$client->sendText('Message');
$client->close();
}
});
Expand Down Expand Up @@ -188,7 +188,7 @@ public function handleClient(WebsocketClient $client, Request $request, Response
$message->buffer();

self::fail('Buffering the message should have thrown a ClosedException due to exceeding the message size limit');
} catch (ClosedException $exception) {
} catch (WebsocketClosedException $exception) {
$this->assertSame('Received payload exceeds maximum allowable size', $exception->getReason());
} finally {
$server->stop();
Expand Down

0 comments on commit 090367c

Please sign in to comment.