diff --git a/.github/workflows/acceptance.yml b/.github/workflows/acceptance.yml
index 06c1766..a55f2a7 100644
--- a/.github/workflows/acceptance.yml
+++ b/.github/workflows/acceptance.yml
@@ -3,42 +3,12 @@ name: Acceptance
on: [push, pull_request]
jobs:
- test-7-2:
- runs-on: ubuntu-latest
- name: Test PHP 7.2
- steps:
- - name: Checkout
- uses: actions/checkout@v2
- - name: Set up PHP 7.2
- uses: shivammathur/setup-php@v2
- with:
- php-version: '7.2'
- - name: Composer
- run: make install
- - name: Test
- run: make test
-
- test-7-3:
- runs-on: ubuntu-latest
- name: Test PHP 7.3
- steps:
- - name: Checkout
- uses: actions/checkout@v2
- - name: Set up PHP 7.3
- uses: shivammathur/setup-php@v2
- with:
- php-version: '7.3'
- - name: Composer
- run: make install
- - name: Test
- run: make test
-
test-7-4:
runs-on: ubuntu-latest
name: Test PHP 7.4
steps:
- name: Checkout
- uses: actions/checkout@v2
+ uses: actions/checkout@v3
- name: Set up PHP 7.4
uses: shivammathur/setup-php@v2
with:
@@ -53,7 +23,7 @@ jobs:
name: Test PHP 8.0
steps:
- name: Checkout
- uses: actions/checkout@v2
+ uses: actions/checkout@v3
- name: Set up PHP 8.0
uses: shivammathur/setup-php@v2
with:
@@ -68,7 +38,7 @@ jobs:
name: Test PHP 8.1
steps:
- name: Checkout
- uses: actions/checkout@v2
+ uses: actions/checkout@v3
- name: Set up PHP 8.1
uses: shivammathur/setup-php@v2
with:
@@ -78,13 +48,27 @@ jobs:
- name: Test
run: make test
+ test-8-2:
+ runs-on: ubuntu-latest
+ name: Test PHP 8.2
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ - name: Set up PHP 8.2
+ uses: shivammathur/setup-php@v2
+ with:
+ php-version: '8.2'
+ - name: Composer
+ run: make install
+ - name: Test
+ run: make test
cs-check:
runs-on: ubuntu-latest
name: Code standard
steps:
- name: Checkout
- uses: actions/checkout@v2
+ uses: actions/checkout@v3
- name: Set up PHP 8.0
uses: shivammathur/setup-php@v2
with:
@@ -99,7 +83,7 @@ jobs:
name: Code coverage
steps:
- name: Checkout
- uses: actions/checkout@v2
+ uses: actions/checkout@v3
- name: Set up PHP 8.0
uses: shivammathur/setup-php@v2
with:
diff --git a/Makefile b/Makefile
index 930a9ed..54d507e 100644
--- a/Makefile
+++ b/Makefile
@@ -9,7 +9,7 @@ test: composer.lock
./vendor/bin/phpunit
cs-check: composer.lock
- ./vendor/bin/phpcs --standard=codestandard.xml lib tests examples
+ ./vendor/bin/phpcs --standard=PSR1,PSR12 --encoding=UTF-8 --report=full --colors lib tests examples
coverage: composer.lock build
XDEBUG_MODE=coverage ./vendor/bin/phpunit --coverage-clover build/logs/clover.xml
diff --git a/README.md b/README.md
index 70c16cc..ab55cf2 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,6 @@ It does not include convenience operations such as listeners and implicit error
- [Client](docs/Client.md)
- [Server](docs/Server.md)
-- [Message](docs/Message.md)
- [Examples](docs/Examples.md)
- [Changelog](docs/Changelog.md)
- [Contributing](docs/Contributing.md)
@@ -24,7 +23,8 @@ Preferred way to install is with [Composer](https://getcomposer.org/).
composer require textalk/websocket
```
-* Current version support PHP versions `^7.2|^8.0`.
+* Current version support PHP versions `^7.4|^8.0`.
+* For PHP `7.2` and `7.3` support use version [`1.5`](https://github.com/Textalk/websocket-php/tree/1.5.0).
* For PHP `7.1` support use version [`1.4`](https://github.com/Textalk/websocket-php/tree/1.4.0).
* For PHP `^5.4` and `7.0` support use version [`1.3`](https://github.com/Textalk/websocket-php/tree/1.3.0).
diff --git a/codestandard.xml b/codestandard.xml
deleted file mode 100644
index bb1cd26..0000000
--- a/codestandard.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/composer.json b/composer.json
index 9bc0dcc..23018ee 100644
--- a/composer.json
+++ b/composer.json
@@ -8,8 +8,7 @@
"name": "Fredrik Liljegren"
},
{
- "name": "Sören Jensen",
- "email": "soren@abicart.se"
+ "name": "Sören Jensen"
}
],
"autoload": {
@@ -23,11 +22,14 @@
}
},
"require": {
- "php": "^7.2 | ^8.0",
- "psr/log": "^1 | ^2 | ^3"
+ "php": "^7.4 | ^8.0",
+ "phrity/net-uri": "^1.0",
+ "phrity/util-errorhandler": "^1.0",
+ "psr/log": "^1.0 | ^2.0 | ^3.0",
+ "psr/http-message": "^1.0"
},
"require-dev": {
- "phpunit/phpunit": "^8.0|^9.0",
+ "phpunit/phpunit": "^9.0",
"php-coveralls/php-coveralls": "^2.0",
"squizlabs/php_codesniffer": "^3.5"
}
diff --git a/docs/Changelog.md b/docs/Changelog.md
index 4d98063..c9e086a 100644
--- a/docs/Changelog.md
+++ b/docs/Changelog.md
@@ -2,6 +2,18 @@
# Websocket: Changelog
+## `v1.6`
+
+ > PHP version `^7.4|^8.0`
+
+### `1.6.0`
+ * Connection separate from Client and Server (@sirn-se)
+ * getPier() deprecated, replaced by getRemoteName() (@sirn-se)
+ * Client accepts `Psr\Http\Message\UriInterface` as input for URI:s (@sirn-se)
+ * Bad URI throws exception when Client is instanciated, previously when used (@sirn-se)
+ * Preparations for multiple conection and listeners (@sirn-se)
+ * Major internal refactoring (@sirn-se)
+
## `v1.5`
> PHP version `^7.2|^8.0`
diff --git a/docs/Client.md b/docs/Client.md
index e6154b6..be8d285 100644
--- a/docs/Client.md
+++ b/docs/Client.md
@@ -10,27 +10,27 @@ It internally supports Upgrade handshake and implicit close and ping/pong operat
```php
WebSocket\Client {
- public __construct(string $uri, array $options = [])
- public __destruct()
- public __toString() : string
-
- public text(string $payload) : void
- public binary(string $payload) : void
- public ping(string $payload = '') : void
- public pong(string $payload = '') : void
- public send(mixed $payload, string $opcode = 'text', bool $masked = true) : void
- public receive() : mixed
- public close(int $status = 1000, mixed $message = 'ttfn') : mixed
-
- public getName() : string|null
- public getPier() : string|null
- public getLastOpcode() : string
- public getCloseStatus() : int
- public isConnected() : bool
- public setTimeout(int $seconds) : void
- public setFragmentSize(int $fragment_size) : self
- public getFragmentSize() : int
- public setLogger(Psr\Log\LoggerInterface $logger = null) : void
+ public __construct(UriInterface|string $uri, array $options = []);
+ public __destruct();
+ public __toString() : string;
+
+ public text(string $payload) : void;
+ public binary(string $payload) : void;
+ public ping(string $payload = '') : void;
+ public pong(string $payload = '') : void;
+ public send(Message|string $payload, string $opcode = 'text', bool $masked = true) : void;
+ public close(int $status = 1000, mixed $message = 'ttfn') : void;
+ public receive() : Message|string|null;
+
+ public getName() : string|null;
+ public getRemoteName() : string|null;
+ public getLastOpcode() : string;
+ public getCloseStatus() : int;
+ public isConnected() : bool;
+ public setTimeout(int $seconds) : void;
+ public setFragmentSize(int $fragment_size) : self;
+ public getFragmentSize() : int;
+ public setLogger(Psr\Log\LoggerInterface $logger = null) : void;
}
```
diff --git a/docs/Contributing.md b/docs/Contributing.md
index 263d868..c68ab83 100644
--- a/docs/Contributing.md
+++ b/docs/Contributing.md
@@ -12,6 +12,13 @@ Requirements on pull requests;
* Code coverage **MUST** remain at 100%.
* Code **MUST** adhere to PSR-1 and PSR-12 code standards.
+Base your patch on corresponding version branch, and target that version branch in your pull request.
+
+* `v1.6-master` current version
+* `v1.5-master` previous version, bug fixes only
+* Older versions should not be target of pull requests
+
+
## Dependency management
Install or update dependencies using [Composer](https://getcomposer.org/).
diff --git a/docs/Examples.md b/docs/Examples.md
index 7dd4e0c..399e0cc 100644
--- a/docs/Examples.md
+++ b/docs/Examples.md
@@ -65,10 +65,13 @@ php examples/echoserver.php --debug // Use runtime debugging
```
These strings can be sent as message to trigger server to perform actions;
-* `exit` - Server will initiate close procedure
-* `ping` - Server will send a ping message
-* `headers` - Server will respond with all headers provided by client
* `auth` - Server will respond with auth header if provided by client
+* `close` - Server will close current connection
+* `exit` - Server will close all active connections
+* `headers` - Server will respond with all headers provided by client
+* `ping` - Server will send a ping message
+* `pong` - Server will send a pong message
+* `stop` - Server will stop listening
* For other sent strings, server will respond with the same strings
## The `random` client
diff --git a/docs/Message.md b/docs/Message.md
index 9bd0f2b..80df04a 100644
--- a/docs/Message.md
+++ b/docs/Message.md
@@ -14,22 +14,22 @@ Available classes correspond to opcode;
Additionally;
* WebSocket\Message\Message - abstract base class for all messages above
-* WebSocket\Message\Factory - Factory class to create Msssage instances
+* WebSocket\Message\Factory - Factory class to create Message instances
## Message abstract class synopsis
```php
WebSocket\Message\Message {
- public __construct(string $payload = '')
- public __toString() : string
+ public __construct(string $payload = '');
+ public __toString() : string;
- public getOpcode() : string
- public getLength() : int
- public getTimestamp() : DateTime
- public getContent() : string
- public setContent(string $payload = '') : void
- public hasContent() : bool
+ public getOpcode() : string;
+ public getLength() : int;
+ public getTimestamp() : DateTime;
+ public getContent() : string;
+ public setContent(string $payload = '') : void;
+ public hasContent() : bool;
}
```
@@ -38,7 +38,7 @@ WebSocket\Message\Message {
```php
WebSocket\Message\Factory {
- public create(string $opcode, string $payload = '') : Message
+ public create(string $opcode, string $payload = '') : Message;
}
```
diff --git a/docs/Server.md b/docs/Server.md
index 7d01a41..9e12e07 100644
--- a/docs/Server.md
+++ b/docs/Server.md
@@ -13,33 +13,33 @@ If you require this kind of server behavior, you need to build it on top of prov
```php
WebSocket\Server {
- public __construct(array $options = [])
- public __destruct()
- public __toString() : string
-
- public accept() : bool
- public text(string $payload) : void
- public binary(string $payload) : void
- public ping(string $payload = '') : void
- public pong(string $payload = '') : void
- public send(mixed $payload, string $opcode = 'text', bool $masked = true) : void
- public receive() : mixed
- public close(int $status = 1000, mixed $message = 'ttfn') : mixed
-
- public getPort() : int
- public getPath() : string
- public getRequest() : array
- public getHeader(string $header_name) : string|null
-
- public getName() : string|null
- public getPier() : string|null
- public getLastOpcode() : string
- public getCloseStatus() : int
- public isConnected() : bool
- public setTimeout(int $seconds) : void
- public setFragmentSize(int $fragment_size) : self
- public getFragmentSize() : int
- public setLogger(Psr\Log\LoggerInterface $logger = null) : void
+ public __construct(array $options = []);
+ public __destruct();
+ public __toString() : string;
+
+ public accept() : bool;
+ public text(string $payload) : void;
+ public binary(string $payload) : void;
+ public ping(string $payload = '') : void;
+ public pong(string $payload = '') : void;
+ public send(Message|string $payload, string $opcode = 'text', bool $masked = true) : void;
+ public close(int $status = 1000, mixed $message = 'ttfn') : void;
+ public receive() : Message|string|null;
+
+ public getPort() : int;
+ public getPath() : string;
+ public getRequest() : array;
+ public getHeader(string $header_name) : string|null;
+
+ public getName() : string|null;
+ public getRemoteName() : string|null;
+ public getLastOpcode() : string;
+ public getCloseStatus() : int;
+ public isConnected() : bool;
+ public setTimeout(int $seconds) : void;
+ public setFragmentSize(int $fragment_size) : self;
+ public getFragmentSize() : int;
+ public setLogger(Psr\Log\LoggerInterface $logger = null) : void;
}
```
@@ -124,7 +124,7 @@ $server = new WebSocket\Server([
'filter' => ['text', 'binary', 'ping'], // Specify message types for receive() to return
'logger' => $my_psr3_logger, // Attach a PSR3 compatible logger
'port' => 9000, // Listening port
- 'return_obj' => true, // Return Message insatnce rather than just text
+ 'return_obj' => true, // Return Message instance rather than just text
'timeout' => 60, // 1 minute time out
]);
```
diff --git a/examples/echoserver.php b/examples/echoserver.php
index 231c4c9..a85e564 100644
--- a/examples/echoserver.php
+++ b/examples/echoserver.php
@@ -16,13 +16,13 @@
error_reporting(-1);
-echo "> Random server\n";
+echo "> Echo server\n";
// Server options specified or random
$options = array_merge([
'port' => 8000,
'timeout' => 200,
- 'filter' => ['text', 'binary', 'ping', 'pong'],
+ 'filter' => ['text', 'binary', 'ping', 'pong', 'close'],
], getopt('', ['port:', 'timeout:', 'debug']));
// If debug mode and logger is available
@@ -32,7 +32,7 @@
echo "> Using logger\n";
}
-// Setting timeout to 200 seconds to make time for all tests and manual runs.
+// Initiate server.
try {
$server = new Server($options);
} catch (ConnectionException $e) {
diff --git a/lib/BadOpcodeException.php b/lib/BadOpcodeException.php
index a518715..260a977 100644
--- a/lib/BadOpcodeException.php
+++ b/lib/BadOpcodeException.php
@@ -1,5 +1,12 @@
0,
- 'text' => 1,
- 'binary' => 2,
- 'close' => 8,
- 'ping' => 9,
- 'pong' => 10,
- ];
-
- public function getLastOpcode(): ?string
- {
- return $this->last_opcode;
- }
-
- public function getCloseStatus(): ?int
- {
- return $this->close_status;
- }
-
- public function isConnected(): bool
- {
- return $this->socket &&
- (get_resource_type($this->socket) == 'stream' ||
- get_resource_type($this->socket) == 'persistent stream');
- }
-
- public function setTimeout(int $timeout): void
- {
- $this->options['timeout'] = $timeout;
-
- if ($this->isConnected()) {
- stream_set_timeout($this->socket, $timeout);
- }
- }
-
- public function setFragmentSize(int $fragment_size): self
- {
- $this->options['fragment_size'] = $fragment_size;
- return $this;
- }
-
- public function getFragmentSize(): int
- {
- return $this->options['fragment_size'];
- }
-
- public function setLogger(LoggerInterface $logger = null): void
- {
- $this->logger = $logger ?: new NullLogger();
- }
-
- public function send(string $payload, string $opcode = 'text', bool $masked = true): void
- {
- if (!$this->isConnected()) {
- $this->connect();
- }
-
- if (!in_array($opcode, array_keys(self::$opcodes))) {
- $warning = "Bad opcode '{$opcode}'. Try 'text' or 'binary'.";
- $this->logger->warning($warning);
- throw new BadOpcodeException($warning);
- }
-
- $payload_chunks = str_split($payload, $this->options['fragment_size']);
- $frame_opcode = $opcode;
-
- for ($index = 0; $index < count($payload_chunks); ++$index) {
- $chunk = $payload_chunks[$index];
- $final = $index == count($payload_chunks) - 1;
-
- $this->sendFragment($final, $chunk, $frame_opcode, $masked);
-
- // all fragments after the first will be marked a continuation
- $frame_opcode = 'continuation';
- }
-
- $this->logger->info("Sent '{$opcode}' message", [
- 'opcode' => $opcode,
- 'content-length' => strlen($payload),
- 'frames' => count($payload_chunks),
- ]);
- }
-
- /**
- * Convenience method to send text message
- * @param string $payload Content as string
- */
- public function text(string $payload): void
- {
- $this->send($payload);
- }
-
- /**
- * Convenience method to send binary message
- * @param string $payload Content as binary string
- */
- public function binary(string $payload): void
- {
- $this->send($payload, 'binary');
- }
-
- /**
- * Convenience method to send ping
- * @param string $payload Optional text as string
- */
- public function ping(string $payload = ''): void
- {
- $this->send($payload, 'ping');
- }
-
- /**
- * Convenience method to send unsolicited pong
- * @param string $payload Optional text as string
- */
- public function pong(string $payload = ''): void
- {
- $this->send($payload, 'pong');
- }
-
- /**
- * Get name of local socket, or null if not connected
- * @return string|null
- */
- public function getName(): ?string
- {
- return $this->isConnected() ? stream_socket_get_name($this->socket, false) : null;
- }
-
- /**
- * Get name of remote socket, or null if not connected
- * @return string|null
- */
- public function getPier(): ?string
- {
- return $this->isConnected() ? stream_socket_get_name($this->socket, true) : null;
- }
-
- /**
- * Get string representation of instance
- * @return string String representation
- */
- public function __toString(): string
- {
- return sprintf(
- "%s(%s)",
- get_class($this),
- $this->getName() ?: 'closed'
- );
- }
-
- /**
- * Receive one message.
- * Will continue reading until read message match filter settings.
- * Return Message instance or string according to settings.
- */
- protected function sendFragment(bool $final, string $payload, string $opcode, bool $masked): void
- {
- $data = '';
-
- $byte_1 = $final ? 0b10000000 : 0b00000000; // Final fragment marker.
- $byte_1 |= self::$opcodes[$opcode]; // Set opcode.
- $data .= pack('C', $byte_1);
-
- $byte_2 = $masked ? 0b10000000 : 0b00000000; // Masking bit marker.
-
- // 7 bits of payload length...
- $payload_length = strlen($payload);
- if ($payload_length > 65535) {
- $data .= pack('C', $byte_2 | 0b01111111);
- $data .= pack('J', $payload_length);
- } elseif ($payload_length > 125) {
- $data .= pack('C', $byte_2 | 0b01111110);
- $data .= pack('n', $payload_length);
- } else {
- $data .= pack('C', $byte_2 | $payload_length);
- }
-
- // Handle masking
- if ($masked) {
- // generate a random mask:
- $mask = '';
- for ($i = 0; $i < 4; $i++) {
- $mask .= chr(rand(0, 255));
- }
- $data .= $mask;
-
- // Append payload to frame:
- for ($i = 0; $i < $payload_length; $i++) {
- $data .= $payload[$i] ^ $mask[$i % 4];
- }
- } else {
- $data .= $payload;
- }
-
- $this->write($data);
- $this->logger->debug("Sent '{$opcode}' frame", [
- 'opcode' => $opcode,
- 'final' => $final,
- 'content-length' => strlen($payload),
- ]);
- }
-
- public function receive()
- {
- $filter = $this->options['filter'];
- if (!$this->isConnected()) {
- $this->connect();
- }
-
- do {
- $response = $this->receiveFragment();
- list ($payload, $final, $opcode) = $response;
-
- // Continuation and factual opcode
- $continuation = ($opcode == 'continuation');
- $payload_opcode = $continuation ? $this->read_buffer['opcode'] : $opcode;
-
- // Filter frames
- if (!in_array($payload_opcode, $filter)) {
- if ($payload_opcode == 'close') {
- return null; // Always abort receive on close
- }
- $final = false;
- continue; // Continue reading
- }
-
- // First continuation frame, create buffer
- if (!$final && !$continuation) {
- $this->read_buffer = ['opcode' => $opcode, 'payload' => $payload, 'frames' => 1];
- continue; // Continue reading
- }
-
- // Subsequent continuation frames, add to buffer
- if ($continuation) {
- $this->read_buffer['payload'] .= $payload;
- $this->read_buffer['frames']++;
- }
- } while (!$final);
-
- // Final, return payload
- $frames = 1;
- if ($continuation) {
- $payload = $this->read_buffer['payload'];
- $frames = $this->read_buffer['frames'];
- $this->read_buffer = null;
- }
- $this->logger->info("Received '{opcode}' message", [
- 'opcode' => $payload_opcode,
- 'content-length' => strlen($payload),
- 'frames' => $frames,
- ]);
-
- $this->last_opcode = $payload_opcode;
- $factory = new Factory();
- return $this->options['return_obj']
- ? $factory->create($payload_opcode, $payload)
- : $payload;
- }
-
- protected function receiveFragment(): array
- {
- // Read the fragment "header" first, two bytes.
- $data = $this->read(2);
- list ($byte_1, $byte_2) = array_values(unpack('C*', $data));
-
- $final = (bool)($byte_1 & 0b10000000); // Final fragment marker.
- $rsv = $byte_1 & 0b01110000; // Unused bits, ignore
-
- // Parse opcode
- $opcode_int = $byte_1 & 0b00001111;
- $opcode_ints = array_flip(self::$opcodes);
- if (!array_key_exists($opcode_int, $opcode_ints)) {
- $warning = "Bad opcode in websocket frame: {$opcode_int}";
- $this->logger->warning($warning);
- throw new ConnectionException($warning, ConnectionException::BAD_OPCODE);
- }
- $opcode = $opcode_ints[$opcode_int];
-
- // Masking bit
- $mask = (bool)($byte_2 & 0b10000000);
-
- $payload = '';
-
- // Payload length
- $payload_length = $byte_2 & 0b01111111;
-
- if ($payload_length > 125) {
- if ($payload_length === 126) {
- $data = $this->read(2); // 126: Payload is a 16-bit unsigned int
- $payload_length = current(unpack('n', $data));
- } else {
- $data = $this->read(8); // 127: Payload is a 64-bit unsigned int
- $payload_length = current(unpack('J', $data));
- }
- }
-
- // Get masking key.
- if ($mask) {
- $masking_key = $this->read(4);
- }
-
- // Get the actual payload, if any (might not be for e.g. close frames.
- if ($payload_length > 0) {
- $data = $this->read($payload_length);
-
- if ($mask) {
- // Unmask payload.
- for ($i = 0; $i < $payload_length; $i++) {
- $payload .= ($data[$i] ^ $masking_key[$i % 4]);
- }
- } else {
- $payload = $data;
- }
- }
-
- $this->logger->debug("Read '{opcode}' frame", [
- 'opcode' => $opcode,
- 'final' => $final,
- 'content-length' => strlen($payload),
- ]);
-
- // if we received a ping, send a pong and wait for the next message
- if ($opcode === 'ping') {
- $this->logger->debug("Received 'ping', sending 'pong'.");
- $this->send($payload, 'pong', true);
- return [$payload, true, $opcode];
- }
-
- // if we received a pong, wait for the next message
- if ($opcode === 'pong') {
- $this->logger->debug("Received 'pong'.");
- return [$payload, true, $opcode];
- }
-
- if ($opcode === 'close') {
- $status_bin = '';
- $status = '';
- // Get the close status.
- $status_bin = '';
- $status = '';
- if ($payload_length > 0) {
- $status_bin = $payload[0] . $payload[1];
- $status = current(unpack('n', $payload));
- $this->close_status = $status;
- }
- // Get additional close message
- if ($payload_length >= 2) {
- $payload = substr($payload, 2);
- }
-
- $this->logger->debug("Received 'close', status: {$this->close_status}.");
-
- if ($this->is_closing) {
- $this->is_closing = false; // A close response, all done.
- } else {
- $this->send($status_bin . 'Close acknowledged: ' . $status, 'close', true); // Respond.
- }
-
- // Close the socket.
- fclose($this->socket);
-
- // Closing should not return message.
- return [$payload, true, $opcode];
- }
-
- return [$payload, $final, $opcode];
- }
-
- /**
- * Tell the socket to close.
- *
- * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
- * @param string $message A closing message, max 125 bytes.
- */
- public function close(int $status = 1000, string $message = 'ttfn'): void
- {
- if (!$this->isConnected()) {
- return;
- }
- $status_binstr = sprintf('%016b', $status);
- $status_str = '';
- foreach (str_split($status_binstr, 8) as $binstr) {
- $status_str .= chr(bindec($binstr));
- }
- $this->send($status_str . $message, 'close', true);
- $this->logger->debug("Closing with status: {$status_str}.");
-
- $this->is_closing = true;
- $this->receive(); // Receiving a close frame will close the socket now.
- }
-
- /**
- * Disconnect from client/server.
- */
- public function disconnect(): void
- {
- if ($this->isConnected()) {
- fclose($this->socket);
- $this->socket = null;
- }
- }
-
- protected function write(string $data): void
- {
- $length = strlen($data);
- $written = @fwrite($this->socket, $data);
- if ($written === false) {
- $this->throwException("Failed to write {$length} bytes.");
- }
- if ($written < strlen($data)) {
- $this->throwException("Could only write {$written} out of {$length} bytes.");
- }
- $this->logger->debug("Wrote {$written} of {$length} bytes.");
- }
-
- protected function read(string $length): string
- {
- $data = '';
- while (strlen($data) < $length) {
- $buffer = @fread($this->socket, $length - strlen($data));
-
- if (!$buffer) {
- $meta = stream_get_meta_data($this->socket);
- if (!empty($meta['timed_out'])) {
- $message = 'Client read timeout';
- $this->logger->error($message, $meta);
- throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
- }
- }
- if ($buffer === false) {
- $read = strlen($data);
- $this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
- }
- if ($buffer === '') {
- $this->throwException("Empty read; connection dead?");
- }
- $data .= $buffer;
- $read = strlen($data);
- $this->logger->debug("Read {$read} of {$length} bytes.");
- }
- return $data;
- }
-
- protected function throwException(string $message, int $code = 0): void
- {
- $meta = ['closed' => true];
- if ($this->isConnected()) {
- $meta = stream_get_meta_data($this->socket);
- fclose($this->socket);
- $this->socket = null;
- }
- if (!empty($meta['timed_out'])) {
- $this->logger->error($message, $meta);
- throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
- }
- if (!empty($meta['eof'])) {
- $code = ConnectionException::EOF;
- }
- $this->logger->error($message, $meta);
- throw new ConnectionException($message, $code, $meta);
- }
-}
diff --git a/lib/Client.php b/lib/Client.php
index 71d320a..6270b66 100644
--- a/lib/Client.php
+++ b/lib/Client.php
@@ -1,7 +1,7 @@
null,
- 'filter' => ['text', 'binary'],
- 'fragment_size' => 4096,
- 'headers' => null,
- 'logger' => null,
- 'origin' => null, // @deprecated
- 'persistent' => false,
- 'return_obj' => false,
- 'timeout' => 5,
+ 'context' => null,
+ 'filter' => ['text', 'binary'],
+ 'fragment_size' => 4096,
+ 'headers' => null,
+ 'logger' => null,
+ 'origin' => null, // @deprecated
+ 'persistent' => false,
+ 'return_obj' => false,
+ 'timeout' => 5,
];
- protected $socket_uri;
+ private $socket_uri;
+ private $connection;
+ private $options = [];
+ private $listen = false;
+ private $last_opcode = null;
+
+
+ /* ---------- Magic methods ------------------------------------------------------ */
/**
- * @param string $uri A ws/wss-URI
- * @param array $options
+ * @param UriInterface|string $uri A ws/wss-URI
+ * @param array $options
* Associative array containing:
* - context: Set the stream context. Default: empty context
* - timeout: Set the socket timeout in seconds. Default: 5
* - fragment_size: Set framgemnt size. Default: 4096
* - headers: Associative array of headers to set/override.
*/
- public function __construct(string $uri, array $options = [])
+ public function __construct($uri, array $options = [])
{
- $this->options = array_merge(self::$default_options, $options);
- $this->socket_uri = $uri;
+ $this->socket_uri = $this->parseUri($uri);
+ $this->options = array_merge(self::$default_options, [
+ 'logger' => new NullLogger(),
+ ], $options);
$this->setLogger($this->options['logger']);
}
- public function __destruct()
+ /**
+ * Get string representation of instance.
+ * @return string String representation.
+ */
+ public function __toString(): string
{
- if ($this->isConnected() && get_resource_type($this->socket) !== 'persistent stream') {
- fclose($this->socket);
+ return sprintf(
+ "%s(%s)",
+ get_class($this),
+ $this->getName() ?: 'closed'
+ );
+ }
+
+
+ /* ---------- Client option functions -------------------------------------------- */
+
+ /**
+ * Set timeout.
+ * @param int $timeout Timeout in seconds.
+ */
+ public function setTimeout(int $timeout): void
+ {
+ $this->options['timeout'] = $timeout;
+ if (!$this->isConnected()) {
+ return;
}
- $this->socket = null;
+ $this->connection->setTimeout($timeout);
+ $this->connection->setOptions($this->options);
}
/**
- * Perform WebSocket handshake
+ * Set fragmentation size.
+ * @param int $fragment_size Fragment size in bytes.
+ * @return self.
*/
- protected function connect(): void
+ public function setFragmentSize(int $fragment_size): self
{
- $url_parts = parse_url($this->socket_uri);
- if (empty($url_parts) || empty($url_parts['scheme']) || empty($url_parts['host'])) {
- $error = "Invalid url '{$this->socket_uri}' provided.";
- $this->logger->error($error);
- throw new BadUriException($error);
+ $this->options['fragment_size'] = $fragment_size;
+ $this->connection->setOptions($this->options);
+ return $this;
+ }
+
+ /**
+ * Get fragmentation size.
+ * @return int $fragment_size Fragment size in bytes.
+ */
+ public function getFragmentSize(): int
+ {
+ return $this->options['fragment_size'];
+ }
+
+
+ /* ---------- Connection operations ---------------------------------------------- */
+
+ /**
+ * Send text message.
+ * @param string $payload Content as string.
+ */
+ public function text(string $payload): void
+ {
+ $this->send($payload);
+ }
+
+ /**
+ * Send binary message.
+ * @param string $payload Content as binary string.
+ */
+ public function binary(string $payload): void
+ {
+ $this->send($payload, 'binary');
+ }
+
+ /**
+ * Send ping.
+ * @param string $payload Optional text as string.
+ */
+ public function ping(string $payload = ''): void
+ {
+ $this->send($payload, 'ping');
+ }
+
+ /**
+ * Send unsolicited pong.
+ * @param string $payload Optional text as string.
+ */
+ public function pong(string $payload = ''): void
+ {
+ $this->send($payload, 'pong');
+ }
+
+ /**
+ * Send message.
+ * @param string $payload Message to send.
+ * @param string $opcode Opcode to use, default: 'text'.
+ * @param bool $masked If message should be masked default: true.
+ */
+ public function send(string $payload, string $opcode = 'text', bool $masked = true): void
+ {
+ if (!$this->isConnected()) {
+ $this->connect();
}
- $scheme = $url_parts['scheme'];
- $host = $url_parts['host'];
- $user = isset($url_parts['user']) ? $url_parts['user'] : '';
- $pass = isset($url_parts['pass']) ? $url_parts['pass'] : '';
- $port = isset($url_parts['port']) ? $url_parts['port'] : ($scheme === 'wss' ? 443 : 80);
- $path = isset($url_parts['path']) ? $url_parts['path'] : '/';
- $query = isset($url_parts['query']) ? $url_parts['query'] : '';
- $fragment = isset($url_parts['fragment']) ? $url_parts['fragment'] : '';
-
- $path_with_query = $path;
- if (!empty($query)) {
- $path_with_query .= '?' . $query;
+
+ if (!in_array($opcode, array_keys(self::$opcodes))) {
+ $warning = "Bad opcode '{$opcode}'. Try 'text' or 'binary'.";
+ $this->logger->warning($warning);
+ throw new BadOpcodeException($warning);
}
- if (!empty($fragment)) {
- $path_with_query .= '#' . $fragment;
+
+ $factory = new Factory();
+ $message = $factory->create($opcode, $payload);
+ $this->connection->pushMessage($message, $masked);
+ }
+
+ /**
+ * Tell the socket to close.
+ * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
+ * @param string $message A closing message, max 125 bytes.
+ */
+ public function close(int $status = 1000, string $message = 'ttfn'): void
+ {
+ if (!$this->isConnected()) {
+ return;
}
+ $this->connection->close($status, $message);
+ }
- if (!in_array($scheme, ['ws', 'wss'])) {
- $error = "Url should have scheme ws or wss, not '{$scheme}' from URI '{$this->socket_uri}'.";
- $this->logger->error($error);
- throw new BadUriException($error);
+ /**
+ * Disconnect from server.
+ */
+ public function disconnect(): void
+ {
+ if ($this->isConnected()) {
+ $this->connection->disconnect();
}
+ }
- $host_uri = ($scheme === 'wss' ? 'ssl' : 'tcp') . '://' . $host;
+ /**
+ * Receive message.
+ * Note that this operation will block reading.
+ * @return mixed Message, text or null depending on settings.
+ */
+ public function receive()
+ {
+ $filter = $this->options['filter'];
+ $return_obj = $this->options['return_obj'];
+
+ if (!$this->isConnected()) {
+ $this->connect();
+ }
+
+ while (true) {
+ $message = $this->connection->pullMessage();
+ $opcode = $message->getOpcode();
+ if (in_array($opcode, $filter)) {
+ $this->last_opcode = $opcode;
+ $return = $return_obj ? $message : $message->getContent();
+ break;
+ } elseif ($opcode == 'close') {
+ $this->last_opcode = null;
+ $return = $return_obj ? $message : null;
+ break;
+ }
+ }
+ return $return;
+ }
+
+
+ /* ---------- Connection functions ----------------------------------------------- */
+
+ /**
+ * Get last received opcode.
+ * @return string|null Opcode.
+ */
+ public function getLastOpcode(): ?string
+ {
+ return $this->last_opcode;
+ }
+
+ /**
+ * Get close status on connection.
+ * @return int|null Close status.
+ */
+ public function getCloseStatus(): ?int
+ {
+ return $this->connection ? $this->connection->getCloseStatus() : null;
+ }
+
+ /**
+ * If Client has active connection.
+ * @return bool True if active connection.
+ */
+ public function isConnected(): bool
+ {
+ return $this->connection && $this->connection->isConnected();
+ }
+
+ /**
+ * Get name of local socket, or null if not connected.
+ * @return string|null
+ */
+ public function getName(): ?string
+ {
+ return $this->isConnected() ? $this->connection->getName() : null;
+ }
+
+ /**
+ * Get name of remote socket, or null if not connected.
+ * @return string|null
+ */
+ public function getRemoteName(): ?string
+ {
+ return $this->isConnected() ? $this->connection->getRemoteName() : null;
+ }
+
+ /**
+ * Get name of remote socket, or null if not connected.
+ * @return string|null
+ * @deprecated Will be removed in future version, use getPeer() instead.
+ */
+ public function getPier(): ?string
+ {
+ trigger_error(
+ 'getPier() is deprecated and will be removed in future version. Use getRemoteName() instead.',
+ E_USER_DEPRECATED
+ );
+ return $this->getRemoteName();
+ }
+
+
+ /* ---------- Helper functions --------------------------------------------------- */
+
+ /**
+ * Perform WebSocket handshake
+ */
+ protected function connect(): void
+ {
+ $this->connection = null;
+
+ $host_uri = (new Uri())
+ ->withHost($this->socket_uri->getHost())
+ ->withScheme($this->socket_uri->getScheme() == 'wss' ? 'ssl' : 'tcp')
+ ->withPort($this->socket_uri->getPort());
+
+ $http_uri = (new Uri())
+ ->withPath($this->socket_uri->getPath())
+ ->withQuery($this->socket_uri->getQuery())
+ ->withFragment($this->socket_uri->getFragment());
// Set the stream context options if they're already set in the config
if (isset($this->options['context'])) {
@@ -103,43 +332,49 @@ protected function connect(): void
$persistent = $this->options['persistent'] === true;
$flags = STREAM_CLIENT_CONNECT;
$flags = $persistent ? $flags | STREAM_CLIENT_PERSISTENT : $flags;
+ $socket = null;
- $error = $errno = $errstr = null;
- set_error_handler(function (int $severity, string $message, string $file, int $line) use (&$error) {
- $this->logger->warning($message, ['severity' => $severity]);
- $error = $message;
- }, E_ALL);
-
- // Open the socket.
- $this->socket = stream_socket_client(
- "{$host_uri}:{$port}",
- $errno,
- $errstr,
- $this->options['timeout'],
- $flags,
- $context
- );
-
- restore_error_handler();
+ try {
+ $handler = new ErrorHandler();
+ $socket = $handler->with(function () use ($host_uri, $flags, $context) {
+ $error = $errno = $errstr = null;
+ // Open the socket.
+ return stream_socket_client(
+ $host_uri,
+ $errno,
+ $errstr,
+ $this->options['timeout'],
+ $flags,
+ $context
+ );
+ });
+ if (!$socket) {
+ throw new ErrorException('No socket');
+ }
+ } catch (ErrorException $e) {
+ $error = "Could not open socket to \"{$host_uri->getAuthority()}\": {$e->getMessage()} ({$e->getCode()}).";
+ $this->logger->error($error, ['severity' => $e->getSeverity()]);
+ throw new ConnectionException($error, 0, [], $e);
+ }
+ $this->connection = new Connection($socket, $this->options);
+ $this->connection->setLogger($this->logger);
if (!$this->isConnected()) {
- $error = "Could not open socket to \"{$host}:{$port}\": {$errstr} ({$errno}) {$error}.";
+ $error = "Invalid stream on \"{$host_uri->getAuthority()}\".";
$this->logger->error($error);
throw new ConnectionException($error);
}
- $address = "{$scheme}://{$host}{$path_with_query}";
-
- if (!$persistent || ftell($this->socket) == 0) {
+ if (!$persistent || $this->connection->tell() == 0) {
// Set timeout on the stream as well.
- stream_set_timeout($this->socket, $this->options['timeout']);
+ $this->connection->setTimeout($this->options['timeout']);
// Generate the WebSocket key.
$key = self::generateKey();
// Default headers
$headers = [
- 'Host' => $host . ":" . $port,
+ 'Host' => $host_uri->getAuthority(),
'User-Agent' => 'websocket-client-php',
'Connection' => 'Upgrade',
'Upgrade' => 'websocket',
@@ -148,8 +383,8 @@ protected function connect(): void
];
// Handle basic authentication.
- if ($user || $pass) {
- $headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass);
+ if ($userinfo = $this->socket_uri->getUserInfo()) {
+ $headers['authorization'] = 'Basic ' . base64_encode($userinfo);
}
// Deprecated way of adding origin (use headers instead).
@@ -162,7 +397,7 @@ protected function connect(): void
$headers = array_merge($headers, $this->options['headers']);
}
- $header = "GET " . $path_with_query . " HTTP/1.1\r\n" . implode(
+ $header = "GET {$http_uri} HTTP/1.1\r\n" . implode(
"\r\n",
array_map(
function ($key, $value) {
@@ -174,31 +409,34 @@ function ($key, $value) {
) . "\r\n\r\n";
// Send headers.
- $this->write($header);
+ $this->connection->write($header);
// Get server response header (terminated with double CR+LF).
$response = '';
- do {
- $buffer = fgets($this->socket, 1024);
- if ($buffer === false) {
- $meta = stream_get_meta_data($this->socket);
- $message = 'Client handshake error';
- $this->logger->error($message, $meta);
- throw new ConnectionException($message);
- }
- $response .= $buffer;
- } while (substr_count($response, "\r\n\r\n") == 0);
+ try {
+ do {
+ $buffer = $this->connection->gets(1024);
+ $response .= $buffer;
+ } while (substr_count($response, "\r\n\r\n") == 0);
+ } catch (Exception $e) {
+ throw new ConnectionException('Client handshake error', $e->getCode(), $e->getData(), $e);
+ }
// Validate response.
if (!preg_match('#Sec-WebSocket-Accept:\s(.*)$#mUi', $response, $matches)) {
- $error = "Connection to '{$address}' failed: Server sent invalid upgrade response: {$response}";
+ $error = sprintf(
+ "Connection to '%s' failed: Server sent invalid upgrade response: %s",
+ (string)$this->socket_uri,
+ (string)$response
+ );
$this->logger->error($error);
throw new ConnectionException($error);
}
$keyAccept = trim($matches[1]);
- $expectedResonse
- = base64_encode(pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
+ $expectedResonse = base64_encode(
+ pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))
+ );
if ($keyAccept !== $expectedResonse) {
$error = 'Server sent bad upgrade response.';
@@ -207,12 +445,11 @@ function ($key, $value) {
}
}
- $this->logger->info("Client connected to {$address}");
+ $this->logger->info("Client connected to {$this->socket_uri}");
}
/**
* Generate a random string for WebSocket key.
- *
* @return string Random string
*/
protected static function generateKey(): string
@@ -223,4 +460,23 @@ protected static function generateKey(): string
}
return base64_encode($key);
}
+
+ protected function parseUri($uri): UriInterface
+ {
+ if ($uri instanceof UriInterface) {
+ $uri = $uri;
+ } elseif (is_string($uri)) {
+ try {
+ $uri = new Uri($uri);
+ } catch (InvalidArgumentException $e) {
+ throw new BadUriException("Invalid URI '{$uri}' provided.", 0, $e);
+ }
+ } else {
+ throw new BadUriException("Provided URI must be a UriInterface or string.");
+ }
+ if (!in_array($uri->getScheme(), ['ws', 'wss'])) {
+ throw new BadUriException("Invalid URI scheme, must be 'ws' or 'wss'.");
+ }
+ return $uri;
+ }
}
diff --git a/lib/Connection.php b/lib/Connection.php
new file mode 100644
index 0000000..d5aa48b
--- /dev/null
+++ b/lib/Connection.php
@@ -0,0 +1,518 @@
+stream = $stream;
+ $this->setOptions($options);
+ $this->setLogger(new NullLogger());
+ $this->msg_factory = new Factory();
+ }
+
+ public function __destruct()
+ {
+ if ($this->getType() === 'stream') {
+ fclose($this->stream);
+ }
+ }
+
+ public function setOptions(array $options = []): void
+ {
+ $this->options = array_merge($this->options, $options);
+ }
+
+ public function getCloseStatus(): ?int
+ {
+ return $this->close_status;
+ }
+
+ /**
+ * Tell the socket to close.
+ *
+ * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
+ * @param string $message A closing message, max 125 bytes.
+ */
+ public function close(int $status = 1000, string $message = 'ttfn'): void
+ {
+ if (!$this->isConnected()) {
+ return;
+ }
+ $status_binstr = sprintf('%016b', $status);
+ $status_str = '';
+ foreach (str_split($status_binstr, 8) as $binstr) {
+ $status_str .= chr(bindec($binstr));
+ }
+ $message = $this->msg_factory->create('close', $status_str . $message);
+ $this->pushMessage($message, true);
+
+ $this->logger->debug("Closing with status: {$status}.");
+
+ $this->is_closing = true;
+ while (true) {
+ $message = $this->pullMessage();
+ if ($message->getOpcode() == 'close') {
+ break;
+ }
+ }
+ }
+
+
+ /* ---------- Message methods ---------------------------------------------------- */
+
+ // Push a message to stream
+ public function pushMessage(Message $message, bool $masked = true): void
+ {
+ $frames = $message->getFrames($masked, $this->options['fragment_size']);
+ foreach ($frames as $frame) {
+ $this->pushFrame($frame);
+ }
+ $this->logger->info("[connection] Pushed {$message}", [
+ 'opcode' => $message->getOpcode(),
+ 'content-length' => $message->getLength(),
+ 'frames' => count($frames),
+ ]);
+ }
+
+ // Pull a message from stream
+ public function pullMessage(): Message
+ {
+ do {
+ $frame = $this->pullFrame();
+ $frame = $this->autoRespond($frame);
+ list ($final, $payload, $opcode, $masked) = $frame;
+
+ if ($opcode == 'close') {
+ $this->close();
+ }
+
+ // Continuation and factual opcode
+ $continuation = $opcode == 'continuation';
+ $payload_opcode = $continuation ? $this->read_buffer['opcode'] : $opcode;
+
+ // First continuation frame, create buffer
+ if (!$final && !$continuation) {
+ $this->read_buffer = ['opcode' => $opcode, 'payload' => $payload, 'frames' => 1];
+ continue; // Continue reading
+ }
+
+ // Subsequent continuation frames, add to buffer
+ if ($continuation) {
+ $this->read_buffer['payload'] .= $payload;
+ $this->read_buffer['frames']++;
+ }
+ } while (!$final);
+
+ // Final, return payload
+ $frames = 1;
+ if ($continuation) {
+ $payload = $this->read_buffer['payload'];
+ $frames = $this->read_buffer['frames'];
+ $this->read_buffer = null;
+ }
+
+ $message = $this->msg_factory->create($payload_opcode, $payload);
+
+ $this->logger->info("[connection] Pulled {$message}", [
+ 'opcode' => $payload_opcode,
+ 'content-length' => strlen($payload),
+ 'frames' => $frames,
+ ]);
+
+ return $message;
+ }
+
+
+ /* ---------- Frame I/O methods -------------------------------------------------- */
+
+ // Pull frame from stream
+ private function pullFrame(): array
+ {
+ // Read the fragment "header" first, two bytes.
+ $data = $this->read(2);
+ list ($byte_1, $byte_2) = array_values(unpack('C*', $data));
+ $final = (bool)($byte_1 & 0b10000000); // Final fragment marker.
+ $rsv = $byte_1 & 0b01110000; // Unused bits, ignore
+
+ // Parse opcode
+ $opcode_int = $byte_1 & 0b00001111;
+ $opcode_ints = array_flip(self::$opcodes);
+ if (!array_key_exists($opcode_int, $opcode_ints)) {
+ $warning = "Bad opcode in websocket frame: {$opcode_int}";
+ $this->logger->warning($warning);
+ throw new ConnectionException($warning, ConnectionException::BAD_OPCODE);
+ }
+ $opcode = $opcode_ints[$opcode_int];
+
+ // Masking bit
+ $masked = (bool)($byte_2 & 0b10000000);
+
+ $payload = '';
+
+ // Payload length
+ $payload_length = $byte_2 & 0b01111111;
+
+ if ($payload_length > 125) {
+ if ($payload_length === 126) {
+ $data = $this->read(2); // 126: Payload is a 16-bit unsigned int
+ $payload_length = current(unpack('n', $data));
+ } else {
+ $data = $this->read(8); // 127: Payload is a 64-bit unsigned int
+ $payload_length = current(unpack('J', $data));
+ }
+ }
+
+ // Get masking key.
+ if ($masked) {
+ $masking_key = $this->read(4);
+ }
+
+ // Get the actual payload, if any (might not be for e.g. close frames.
+ if ($payload_length > 0) {
+ $data = $this->read($payload_length);
+
+ if ($masked) {
+ // Unmask payload.
+ for ($i = 0; $i < $payload_length; $i++) {
+ $payload .= ($data[$i] ^ $masking_key[$i % 4]);
+ }
+ } else {
+ $payload = $data;
+ }
+ }
+
+ $this->logger->debug("[connection] Pulled '{opcode}' frame", [
+ 'opcode' => $opcode,
+ 'final' => $final,
+ 'content-length' => strlen($payload),
+ ]);
+ return [$final, $payload, $opcode, $masked];
+ }
+
+ // Push frame to stream
+ private function pushFrame(array $frame): void
+ {
+ list ($final, $payload, $opcode, $masked) = $frame;
+ $data = '';
+ $byte_1 = $final ? 0b10000000 : 0b00000000; // Final fragment marker.
+ $byte_1 |= self::$opcodes[$opcode]; // Set opcode.
+ $data .= pack('C', $byte_1);
+
+ $byte_2 = $masked ? 0b10000000 : 0b00000000; // Masking bit marker.
+
+ // 7 bits of payload length...
+ $payload_length = strlen($payload);
+ if ($payload_length > 65535) {
+ $data .= pack('C', $byte_2 | 0b01111111);
+ $data .= pack('J', $payload_length);
+ } elseif ($payload_length > 125) {
+ $data .= pack('C', $byte_2 | 0b01111110);
+ $data .= pack('n', $payload_length);
+ } else {
+ $data .= pack('C', $byte_2 | $payload_length);
+ }
+
+ // Handle masking
+ if ($masked) {
+ // generate a random mask:
+ $mask = '';
+ for ($i = 0; $i < 4; $i++) {
+ $mask .= chr(rand(0, 255));
+ }
+ $data .= $mask;
+
+ // Append payload to frame:
+ for ($i = 0; $i < $payload_length; $i++) {
+ $data .= $payload[$i] ^ $mask[$i % 4];
+ }
+ } else {
+ $data .= $payload;
+ }
+
+ $this->write($data);
+
+ $this->logger->debug("[connection] Pushed '{$opcode}' frame", [
+ 'opcode' => $opcode,
+ 'final' => $final,
+ 'content-length' => strlen($payload),
+ ]);
+ }
+
+ // Trigger auto response for frame
+ private function autoRespond(array $frame)
+ {
+ list ($final, $payload, $opcode, $masked) = $frame;
+ $payload_length = strlen($payload);
+
+ switch ($opcode) {
+ case 'ping':
+ // If we received a ping, respond with a pong
+ $this->logger->debug("[connection] Received 'ping', sending 'pong'.");
+ $message = $this->msg_factory->create('pong', $payload);
+ $this->pushMessage($message, $masked);
+ return [$final, $payload, $opcode, $masked];
+ case 'close':
+ // If we received close, possibly acknowledge and close connection
+ $status_bin = '';
+ $status = '';
+ if ($payload_length > 0) {
+ $status_bin = $payload[0] . $payload[1];
+ $status = current(unpack('n', $payload));
+ $this->close_status = $status;
+ }
+ // Get additional close message
+ if ($payload_length >= 2) {
+ $payload = substr($payload, 2);
+ }
+
+ $this->logger->debug("[connection] Received 'close', status: {$status}.");
+ if (!$this->is_closing) {
+ $ack = "{$status_bin}Close acknowledged: {$status}";
+ $message = $this->msg_factory->create('close', $ack);
+ $this->pushMessage($message, $masked);
+ } else {
+ $this->is_closing = false; // A close response, all done.
+ }
+ $this->disconnect();
+ return [$final, $payload, $opcode, $masked];
+ default:
+ return [$final, $payload, $opcode, $masked];
+ }
+ }
+
+
+ /* ---------- Stream I/O methods ------------------------------------------------- */
+
+ /**
+ * Close connection stream.
+ * @return bool
+ */
+ public function disconnect(): bool
+ {
+ $this->logger->debug('Closing connection');
+ return fclose($this->stream);
+ }
+
+ /**
+ * If connected to stream.
+ * @return bool
+ */
+ public function isConnected(): bool
+ {
+ return in_array($this->getType(), ['stream', 'persistent stream']);
+ }
+
+ /**
+ * Return type of connection.
+ * @return string|null Type of connection or null if invalid type.
+ */
+ public function getType(): ?string
+ {
+ return get_resource_type($this->stream);
+ }
+
+ /**
+ * Get name of local socket, or null if not connected.
+ * @return string|null
+ */
+ public function getName(): ?string
+ {
+ return stream_socket_get_name($this->stream, false);
+ }
+
+ /**
+ * Get name of remote socket, or null if not connected.
+ * @return string|null
+ */
+ public function getRemoteName(): ?string
+ {
+ return stream_socket_get_name($this->stream, true);
+ }
+
+ /**
+ * Get meta data for connection.
+ * @return array
+ */
+ public function getMeta(): array
+ {
+ return stream_get_meta_data($this->stream);
+ }
+
+ /**
+ * Returns current position of stream pointer.
+ * @return int
+ * @throws ConnectionException
+ */
+ public function tell(): int
+ {
+ $tell = ftell($this->stream);
+ if ($tell === false) {
+ $this->throwException('Could not resolve stream pointer position');
+ }
+ return $tell;
+ }
+
+ /**
+ * If stream pointer is at end of file.
+ * @return bool
+ */
+ public function eof(): int
+ {
+ return feof($this->stream);
+ }
+
+
+ /* ---------- Stream option methods ---------------------------------------------- */
+
+ /**
+ * Set time out on connection.
+ * @param int $seconds Timeout part in seconds
+ * @param int $microseconds Timeout part in microseconds
+ * @return bool
+ */
+ public function setTimeout(int $seconds, int $microseconds = 0): bool
+ {
+ $this->logger->debug("Setting timeout {$seconds}:{$microseconds} seconds");
+ return stream_set_timeout($this->stream, $seconds, $microseconds);
+ }
+
+
+ /* ---------- Stream read/write methods ------------------------------------------ */
+
+ /**
+ * Read line from stream.
+ * @param int $length Maximum number of bytes to read
+ * @param string $ending Line delimiter
+ * @return string Read data
+ */
+ public function getLine(int $length, string $ending): string
+ {
+ $line = stream_get_line($this->stream, $length, $ending);
+ if ($line === false) {
+ $this->throwException('Could not read from stream');
+ }
+ $read = strlen($line);
+ $this->logger->debug("Read {$read} bytes of line.");
+ return $line;
+ }
+
+ /**
+ * Read line from stream.
+ * @param int $length Maximum number of bytes to read
+ * @return string Read data
+ */
+ public function gets(int $length): string
+ {
+ $line = fgets($this->stream, $length);
+ if ($line === false) {
+ $this->throwException('Could not read from stream');
+ }
+ $read = strlen($line);
+ $this->logger->debug("Read {$read} bytes of line.");
+ return $line;
+ }
+
+ /**
+ * Read characters from stream.
+ * @param int $length Maximum number of bytes to read
+ * @return string Read data
+ */
+ public function read(string $length): string
+ {
+ $data = '';
+ while (strlen($data) < $length) {
+ $buffer = fread($this->stream, $length - strlen($data));
+ if (!$buffer) {
+ $meta = stream_get_meta_data($this->stream);
+ if (!empty($meta['timed_out'])) {
+ $message = 'Client read timeout';
+ $this->logger->error($message, $meta);
+ throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
+ }
+ }
+ if ($buffer === false) {
+ $read = strlen($data);
+ $this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
+ }
+ if ($buffer === '') {
+ $this->throwException("Empty read; connection dead?");
+ }
+ $data .= $buffer;
+ $read = strlen($data);
+ $this->logger->debug("Read {$read} of {$length} bytes.");
+ }
+ return $data;
+ }
+
+ /**
+ * Write characters to stream.
+ * @param string $data Data to read
+ */
+ public function write(string $data): void
+ {
+ $length = strlen($data);
+ $written = fwrite($this->stream, $data);
+ if ($written === false) {
+ $this->throwException("Failed to write {$length} bytes.");
+ }
+ if ($written < strlen($data)) {
+ $this->throwException("Could only write {$written} out of {$length} bytes.");
+ }
+ $this->logger->debug("Wrote {$written} of {$length} bytes.");
+ }
+
+
+ /* ---------- Internal helper methods -------------------------------------------- */
+
+ private function throwException(string $message, int $code = 0): void
+ {
+ $meta = ['closed' => true];
+ if ($this->isConnected()) {
+ $meta = $this->getMeta();
+ $this->disconnect();
+ if (!empty($meta['timed_out'])) {
+ $this->logger->error($message, $meta);
+ throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
+ }
+ if (!empty($meta['eof'])) {
+ $code = ConnectionException::EOF;
+ }
+ }
+ $this->logger->error($message, $meta);
+ throw new ConnectionException($message, $code, $meta);
+ }
+}
diff --git a/lib/ConnectionException.php b/lib/ConnectionException.php
index 7e1ecbf..aa1d7f4 100644
--- a/lib/ConnectionException.php
+++ b/lib/ConnectionException.php
@@ -1,5 +1,12 @@
getContent(), $framesize) ?: [''];
+ foreach ($split as $payload) {
+ $frames[] = [false, $payload, 'continuation', $masked];
+ }
+ $frames[0][2] = $this->opcode;
+ $frames[array_key_last($frames)][0] = true;
+ return $frames;
+ }
}
diff --git a/lib/Message/Ping.php b/lib/Message/Ping.php
index 908d233..f9bb652 100644
--- a/lib/Message/Ping.php
+++ b/lib/Message/Ping.php
@@ -1,5 +1,12 @@
0,
+ 'text' => 1,
+ 'binary' => 2,
+ 'close' => 8,
+ 'ping' => 9,
+ 'pong' => 10,
+ ];
+}
diff --git a/lib/Server.php b/lib/Server.php
index ae9325f..1521588 100644
--- a/lib/Server.php
+++ b/lib/Server.php
@@ -1,7 +1,7 @@
['text', 'binary'],
- 'fragment_size' => 4096,
- 'logger' => null,
- 'port' => 8000,
- 'return_obj' => false,
- 'timeout' => null,
+ 'filter' => ['text', 'binary'],
+ 'fragment_size' => 4096,
+ 'logger' => null,
+ 'port' => 8000,
+ 'return_obj' => false,
+ 'timeout' => null,
];
- protected $addr;
protected $port;
protected $listening;
protected $request;
protected $request_path;
+ private $connections = [];
+ private $options = [];
+ private $listen = false;
+ private $last_opcode;
+
+
+ /* ---------- Magic methods ------------------------------------------------------ */
/**
* @param array $options
* Associative array containing:
- * - timeout: Set the socket timeout in seconds.
+ * - filter: Array of opcodes to handle. Default: ['text', 'binary'].
* - fragment_size: Set framgemnt size. Default: 4096
- * - port: Chose port for listening. Default 8000.
+ * - logger: PSR-3 compatible logger. Default NullLogger.
+ * - port: Chose port for listening. Default 8000.
+ * - return_obj: If receive() function return Message instance. Default false.
+ * - timeout: Set the socket timeout in seconds.
*/
public function __construct(array $options = [])
{
- $this->options = array_merge(self::$default_options, $options);
+ $this->options = array_merge(self::$default_options, [
+ 'logger' => new NullLogger(),
+ ], $options);
$this->port = $this->options['port'];
$this->setLogger($this->options['logger']);
@@ -61,29 +87,231 @@ public function __construct(array $options = [])
$this->logger->info("Server listening to port {$this->port}");
}
- public function __destruct()
+ /**
+ * Get string representation of instance.
+ * @return string String representation.
+ */
+ public function __toString(): string
{
- if ($this->isConnected()) {
- fclose($this->socket);
- }
- $this->socket = null;
+ return sprintf(
+ "%s(%s)",
+ get_class($this),
+ $this->getName() ?: 'closed'
+ );
+ }
+
+
+ /* ---------- Server operations -------------------------------------------------- */
+
+ /**
+ * Accept a single incoming request.
+ * Note that this operation will block accepting additional requests.
+ * @return bool True if listening.
+ */
+ public function accept(): bool
+ {
+ $this->disconnect();
+ return (bool)$this->listening;
}
+
+ /* ---------- Server option functions -------------------------------------------- */
+
+ /**
+ * Get current port.
+ * @return int port.
+ */
public function getPort(): int
{
return $this->port;
}
+ /**
+ * Set timeout.
+ * @param int $timeout Timeout in seconds.
+ */
+ public function setTimeout(int $timeout): void
+ {
+ $this->options['timeout'] = $timeout;
+ if (!$this->isConnected()) {
+ return;
+ }
+ foreach ($this->connections as $connection) {
+ $connection->setTimeout($timeout);
+ $connection->setOptions($this->options);
+ }
+ }
+
+ /**
+ * Set fragmentation size.
+ * @param int $fragment_size Fragment size in bytes.
+ * @return self.
+ */
+ public function setFragmentSize(int $fragment_size): self
+ {
+ $this->options['fragment_size'] = $fragment_size;
+ foreach ($this->connections as $connection) {
+ $connection->setOptions($this->options);
+ }
+ return $this;
+ }
+
+ /**
+ * Get fragmentation size.
+ * @return int $fragment_size Fragment size in bytes.
+ */
+ public function getFragmentSize(): int
+ {
+ return $this->options['fragment_size'];
+ }
+
+
+ /* ---------- Connection broadcast operations ------------------------------------ */
+
+ /**
+ * Broadcast text message to all conenctions.
+ * @param string $payload Content as string.
+ */
+ public function text(string $payload): void
+ {
+ $this->send($payload);
+ }
+
+ /**
+ * Broadcast binary message to all conenctions.
+ * @param string $payload Content as binary string.
+ */
+ public function binary(string $payload): void
+ {
+ $this->send($payload, 'binary');
+ }
+
+ /**
+ * Broadcast ping message to all conenctions.
+ * @param string $payload Optional text as string.
+ */
+ public function ping(string $payload = ''): void
+ {
+ $this->send($payload, 'ping');
+ }
+
+ /**
+ * Broadcast pong message to all conenctions.
+ * @param string $payload Optional text as string.
+ */
+ public function pong(string $payload = ''): void
+ {
+ $this->send($payload, 'pong');
+ }
+
+ /**
+ * Send message on all connections.
+ * @param string $payload Message to send.
+ * @param string $opcode Opcode to use, default: 'text'.
+ * @param bool $masked If message should be masked default: true.
+ */
+ public function send(string $payload, string $opcode = 'text', bool $masked = true): void
+ {
+ if (!$this->isConnected()) {
+ $this->connect();
+ }
+ if (!in_array($opcode, array_keys(self::$opcodes))) {
+ $warning = "Bad opcode '{$opcode}'. Try 'text' or 'binary'.";
+ $this->logger->warning($warning);
+ throw new BadOpcodeException($warning);
+ }
+
+ $factory = new Factory();
+ $message = $factory->create($opcode, $payload);
+
+ foreach ($this->connections as $connection) {
+ $connection->pushMessage($message, $masked);
+ }
+ }
+
+ /**
+ * Close all connections.
+ * @param int $status Close status, default: 1000.
+ * @param string $message Close message, default: 'ttfn'.
+ */
+ public function close(int $status = 1000, string $message = 'ttfn'): void
+ {
+ foreach ($this->connections as $connection) {
+ if ($connection->isConnected()) {
+ $connection->close($status, $message);
+ }
+ }
+ }
+
+ /**
+ * Disconnect all connections.
+ */
+ public function disconnect(): void
+ {
+ foreach ($this->connections as $connection) {
+ if ($connection->isConnected()) {
+ $connection->disconnect();
+ }
+ }
+ $this->connections = [];
+ }
+
+ /**
+ * Receive message from single connection.
+ * Note that this operation will block reading and only read from first available connection.
+ * @return mixed Message, text or null depending on settings.
+ */
+ public function receive()
+ {
+ $filter = $this->options['filter'];
+ $return_obj = $this->options['return_obj'];
+
+ if (!$this->isConnected()) {
+ $this->connect();
+ }
+ $connection = current($this->connections);
+
+ while (true) {
+ $message = $connection->pullMessage();
+ $opcode = $message->getOpcode();
+ if (in_array($opcode, $filter)) {
+ $this->last_opcode = $opcode;
+ $return = $return_obj ? $message : $message->getContent();
+ break;
+ } elseif ($opcode == 'close') {
+ $this->last_opcode = null;
+ $return = $return_obj ? $message : null;
+ break;
+ }
+ }
+ return $return;
+ }
+
+
+ /* ---------- Connection functions ----------------------------------------------- */
+
+ /**
+ * Get requested path from last connection.
+ * @return string Path.
+ */
public function getPath(): string
{
return $this->request_path;
}
+ /**
+ * Get request from last connection.
+ * @return array Request.
+ */
public function getRequest(): array
{
return $this->request;
}
+ /**
+ * Get headers from last connection.
+ * @return string|null Headers.
+ */
public function getHeader($header): ?string
{
foreach ($this->request as $row) {
@@ -95,51 +323,117 @@ public function getHeader($header): ?string
return null;
}
- public function accept(): bool
+ /**
+ * Get last received opcode.
+ * @return string|null Opcode.
+ */
+ public function getLastOpcode(): ?string
{
- $this->socket = null;
- return (bool)$this->listening;
+ return $this->last_opcode;
}
- protected function connect(): void
+ /**
+ * Get close status from single connection.
+ * @return int|null Close status.
+ */
+ public function getCloseStatus(): ?int
{
+ return $this->connections ? current($this->connections)->getCloseStatus() : null;
+ }
- $error = null;
- set_error_handler(function (int $severity, string $message, string $file, int $line) use (&$error) {
- $this->logger->warning($message, ['severity' => $severity]);
- $error = $message;
- }, E_ALL);
-
- if (isset($this->options['timeout'])) {
- $this->socket = stream_socket_accept($this->listening, $this->options['timeout']);
- } else {
- $this->socket = stream_socket_accept($this->listening);
+ /**
+ * If Server has active connections.
+ * @return bool True if active connection.
+ */
+ public function isConnected(): bool
+ {
+ foreach ($this->connections as $connection) {
+ if ($connection->isConnected()) {
+ return true;
+ }
}
+ return false;
+ }
- restore_error_handler();
+ /**
+ * Get name of local socket from single connection.
+ * @return string|null Name of local socket.
+ */
+ public function getName(): ?string
+ {
+ return $this->isConnected() ? current($this->connections)->getName() : null;
+ }
- if (!$this->socket) {
- $this->throwException("Server failed to connect. {$error}");
+ /**
+ * Get name of remote socket from single connection.
+ * @return string|null Name of remote socket.
+ */
+ public function getRemoteName(): ?string
+ {
+ return $this->isConnected() ? current($this->connections)->getRemoteName() : null;
+ }
+
+ /**
+ * @deprecated Will be removed in future version.
+ */
+ public function getPier(): ?string
+ {
+ trigger_error(
+ 'getPier() is deprecated and will be removed in future version. Use getRemoteName() instead.',
+ E_USER_DEPRECATED
+ );
+ return $this->getRemoteName();
+ }
+
+
+ /* ---------- Helper functions --------------------------------------------------- */
+
+ // Connect when read/write operation is performed.
+ private function connect(): void
+ {
+ try {
+ $handler = new ErrorHandler();
+ $socket = $handler->with(function () {
+ if (isset($this->options['timeout'])) {
+ $socket = stream_socket_accept($this->listening, $this->options['timeout']);
+ } else {
+ $socket = stream_socket_accept($this->listening);
+ }
+ if (!$socket) {
+ throw new ErrorException('No socket');
+ }
+ return $socket;
+ });
+ } catch (ErrorException $e) {
+ $error = "Server failed to connect. {$e->getMessage()}";
+ $this->logger->error($error, ['severity' => $e->getSeverity()]);
+ throw new ConnectionException($error, 0, [], $e);
}
+
+ $connection = new Connection($socket, $this->options);
+ $connection->setLogger($this->logger);
+
if (isset($this->options['timeout'])) {
- stream_set_timeout($this->socket, $this->options['timeout']);
+ $connection->setTimeout($this->options['timeout']);
}
$this->logger->info("Client has connected to port {port}", [
'port' => $this->port,
- 'pier' => stream_socket_get_name($this->socket, true),
+ 'peer' => $connection->getRemoteName(),
]);
- $this->performHandshake();
+ $this->performHandshake($connection);
+ $this->connections = ['*' => $connection];
}
- protected function performHandshake(): void
+ // Perform upgrade handshake on new connections.
+ private function performHandshake(Connection $connection): void
{
$request = '';
do {
- $buffer = stream_get_line($this->socket, 1024, "\r\n");
+ $buffer = $connection->getLine(1024, "\r\n");
$request .= $buffer . "\n";
- $metadata = stream_get_meta_data($this->socket);
- } while (!feof($this->socket) && $metadata['unread_bytes'] > 0);
+ $metadata = $connection->getMeta();
+ } while (!$connection->eof() && $metadata['unread_bytes'] > 0);
if (!preg_match('/GET (.*) HTTP\//mUi', $request, $matches)) {
$error = "No GET in request: {$request}";
@@ -170,7 +464,7 @@ protected function performHandshake(): void
. "Sec-WebSocket-Accept: $response_key\r\n"
. "\r\n";
- $this->write($header);
+ $connection->write($header);
$this->logger->debug("Handshake on {$get_uri}");
}
}
diff --git a/lib/TimeoutException.php b/lib/TimeoutException.php
index d20e622..7276556 100644
--- a/lib/TimeoutException.php
+++ b/lib/TimeoutException.php
@@ -1,5 +1,12 @@
-
-
-
-
- tests
-
-
-
-
- lib/
-
-
+
+
+
+ lib/
+
+
+
+
+ tests
+
+
diff --git a/tests/ClientTest.php b/tests/ClientTest.php
index e0cf94c..3c68942 100644
--- a/tests/ClientTest.php
+++ b/tests/ClientTest.php
@@ -9,6 +9,9 @@
namespace WebSocket;
+use ErrorException;
+use Phrity\Net\Uri;
+use Phrity\Util\ErrorHandler;
use PHPUnit\Framework\TestCase;
class ClientTest extends TestCase
@@ -225,22 +228,46 @@ public function testPersistentConnection(): void
$this->assertTrue(MockSocket::isEmpty());
}
+ public function testFailedPersistentConnection(): void
+ {
+ MockSocket::initialize('client.connect-persistent-failure', $this);
+ $client = new Client('ws://localhost:8000/my/mock/path', ['persistent' => true]);
+ $this->expectException('WebSocket\ConnectionException');
+ $this->expectExceptionMessage('Could not resolve stream pointer position');
+ $client->send('Connect');
+ }
+
public function testBadScheme(): void
{
MockSocket::initialize('client.connect', $this);
+ $this->expectException('WebSocket\BadUriException');
+ $this->expectExceptionMessage("Invalid URI scheme, must be 'ws' or 'wss'.");
$client = new Client('bad://localhost:8000/my/mock/path');
+ }
+
+ public function testBadUri(): void
+ {
+ MockSocket::initialize('client.connect', $this);
$this->expectException('WebSocket\BadUriException');
- $this->expectExceptionMessage('Url should have scheme ws or wss');
- $client->send('Connect');
+ $this->expectExceptionMessage("Invalid URI '--:this is not an uri:--' provided.");
+ $client = new Client('--:this is not an uri:--');
}
- public function testBadUrl(): void
+ public function testInvalidUriType(): void
{
MockSocket::initialize('client.connect', $this);
- $client = new Client('this is not an url');
$this->expectException('WebSocket\BadUriException');
- $this->expectExceptionMessage('Invalid url \'this is not an url\' provided.');
+ $this->expectExceptionMessage("Provided URI must be a UriInterface or string.");
+ $client = new Client([]);
+ }
+
+ public function testUriInterface(): void
+ {
+ MockSocket::initialize('client.connect', $this);
+ $uri = new Uri('ws://localhost:8000/my/mock/path');
+ $client = new Client($uri);
$client->send('Connect');
+ $this->assertTrue(MockSocket::isEmpty());
}
public function testBadStreamContext(): void
@@ -272,13 +299,33 @@ public function testFailedConnectionWithError(): void
$client->send('Connect');
}
+ public function testBadStreamConnection(): void
+ {
+ MockSocket::initialize('client.connect-bad-stream', $this);
+ $client = new Client('ws://localhost:8000/my/mock/path');
+ $this->expectException('WebSocket\ConnectionException');
+ $this->expectExceptionCode(0);
+ $this->expectExceptionMessage('Invalid stream on "localhost:8000"');
+ $client->send('Connect');
+ }
+
+ public function testHandshakeFailure(): void
+ {
+ MockSocket::initialize('client.connect-handshake-failure', $this);
+ $client = new Client('ws://localhost:8000/my/mock/path');
+ $this->expectException('WebSocket\ConnectionException');
+ $this->expectExceptionCode(0);
+ $this->expectExceptionMessage('Client handshake error');
+ $client->send('Connect');
+ }
+
public function testInvalidUpgrade(): void
{
MockSocket::initialize('client.connect-invalid-upgrade', $this);
$client = new Client('ws://localhost:8000/my/mock/path');
$this->expectException('WebSocket\ConnectionException');
$this->expectExceptionCode(0);
- $this->expectExceptionMessage('Connection to \'ws://localhost/my/mock/path\' failed');
+ $this->expectExceptionMessage('Connection to \'ws://localhost:8000/my/mock/path\' failed');
$client->send('Connect');
}
@@ -357,7 +404,7 @@ public function testHandshakeError(): void
MockSocket::initialize('client.connect-handshake-error', $this);
$client = new Client('ws://localhost:8000/my/mock/path');
$this->expectException('WebSocket\ConnectionException');
- $this->expectExceptionCode(0);
+ $this->expectExceptionCode(1024);
$this->expectExceptionMessage('Client handshake error');
$client->send('Connect');
}
@@ -444,7 +491,7 @@ public function testConvenicanceMethods(): void
MockSocket::initialize('client.connect', $this);
$client = new Client('ws://localhost:8000/my/mock/path');
$this->assertNull($client->getName());
- $this->assertNull($client->getPier());
+ $this->assertNull($client->getRemoteName());
$this->assertEquals('WebSocket\Client(closed)', "{$client}");
$client->text('Connect');
MockSocket::initialize('send-convenicance', $this);
@@ -452,7 +499,32 @@ public function testConvenicanceMethods(): void
$client->ping();
$client->pong();
$this->assertEquals('127.0.0.1:12345', $client->getName());
- $this->assertEquals('127.0.0.1:8000', $client->getPier());
+ $this->assertEquals('127.0.0.1:8000', $client->getRemoteName());
$this->assertEquals('WebSocket\Client(127.0.0.1:12345)', "{$client}");
}
+
+ public function testUnconnectedClient(): void
+ {
+ $client = new Client('ws://localhost:8000/my/mock/path');
+ $this->assertFalse($client->isConnected());
+ $client->setTimeout(30);
+ $client->close();
+ $this->assertFalse($client->isConnected());
+ $this->assertNull($client->getName());
+ $this->assertNull($client->getRemoteName());
+ $this->assertNull($client->getCloseStatus());
+ }
+
+ public function testDeprecated(): void
+ {
+ $client = new Client('ws://localhost:8000/my/mock/path');
+ (new ErrorHandler())->withAll(function () use ($client) {
+ $this->assertNull($client->getPier());
+ }, function ($exceptions, $result) {
+ $this->assertEquals(
+ 'getPier() is deprecated and will be removed in future version. Use getRemoteName() instead.',
+ $exceptions[0]->getMessage()
+ );
+ }, E_USER_DEPRECATED);
+ }
}
diff --git a/tests/README.md b/tests/README.md
index b710a7e..0af2997 100644
--- a/tests/README.md
+++ b/tests/README.md
@@ -14,8 +14,7 @@ make test
## Continuous integration
-GitHub Actions are run on PHP versions
-`7.2`, `7.3`, `7.4` and `8.0`.
+GitHub Actions are run on PHP versions `7.4`, `8.0`, `8.1` and `8.2`.
Code coverage by [Coveralls](https://coveralls.io/github/Textalk/websocket-php).
diff --git a/tests/ServerTest.php b/tests/ServerTest.php
index c2370f4..033895f 100644
--- a/tests/ServerTest.php
+++ b/tests/ServerTest.php
@@ -9,6 +9,8 @@
namespace WebSocket;
+use ErrorException;
+use Phrity\Util\ErrorHandler;
use PHPUnit\Framework\TestCase;
class ServerTest extends TestCase
@@ -430,7 +432,7 @@ public function testConvenicanceMethods(): void
MockSocket::initialize('server.construct', $this);
$server = new Server();
$this->assertNull($server->getName());
- $this->assertNull($server->getPier());
+ $this->assertNull($server->getRemoteName());
$this->assertEquals('WebSocket\Server(closed)', "{$server}");
MockSocket::initialize('server.accept', $this);
$server->accept();
@@ -440,8 +442,70 @@ public function testConvenicanceMethods(): void
$server->ping();
$server->pong();
$this->assertEquals('127.0.0.1:12345', $server->getName());
- $this->assertEquals('127.0.0.1:8000', $server->getPier());
+ $this->assertEquals('127.0.0.1:8000', $server->getRemoteName());
$this->assertEquals('WebSocket\Server(127.0.0.1:12345)', "{$server}");
$this->assertTrue(MockSocket::isEmpty());
}
+
+ public function testUnconnectedServer(): void
+ {
+ MockSocket::initialize('server.construct', $this);
+ $server = new Server();
+ $this->assertFalse($server->isConnected());
+ $server->setTimeout(30);
+ $server->close();
+ $this->assertFalse($server->isConnected());
+ $this->assertNull($server->getName());
+ $this->assertNull($server->getRemoteName());
+ $this->assertNull($server->getCloseStatus());
+ $this->assertTrue(MockSocket::isEmpty());
+ }
+
+ public function testFailedHandshake(): void
+ {
+ MockSocket::initialize('server.construct', $this);
+ $server = new Server();
+ $this->assertTrue(MockSocket::isEmpty());
+
+ MockSocket::initialize('server.accept-failed-handshake', $this);
+ $server->accept();
+ $this->expectException('WebSocket\ConnectionException');
+ $this->expectExceptionCode(0);
+ $this->expectExceptionMessage('Could not read from stream');
+ $server->send('Connect');
+ $this->assertFalse($server->isConnected());
+ $this->assertTrue(MockSocket::isEmpty());
+ }
+
+ public function testServerDisconnect(): void
+ {
+ MockSocket::initialize('server.construct', $this);
+ $server = new Server();
+ $this->assertTrue(MockSocket::isEmpty());
+ MockSocket::initialize('server.accept', $this);
+ $server->accept();
+ $server->send('Connect');
+ $this->assertTrue($server->isConnected());
+ $this->assertTrue(MockSocket::isEmpty());
+
+ MockSocket::initialize('server.disconnect', $this);
+ $server->disconnect();
+ $this->assertFalse($server->isConnected());
+ $this->assertTrue(MockSocket::isEmpty());
+ }
+
+ public function testDeprecated(): void
+ {
+ MockSocket::initialize('server.construct', $this);
+ $server = new Server();
+ $this->assertTrue(MockSocket::isEmpty());
+ (new ErrorHandler())->withAll(function () use ($server) {
+ $this->assertNull($server->getPier());
+ }, function ($exceptions, $result) {
+ $this->assertEquals(
+ 'getPier() is deprecated and will be removed in future version. Use getRemoteName() instead.',
+ $exceptions[0]->getMessage()
+ );
+ }, E_USER_DEPRECATED);
+ }
}
diff --git a/tests/scripts/client.close.json b/tests/scripts/client.close.json
index d449c17..c91c0d6 100644
--- a/tests/scripts/client.close.json
+++ b/tests/scripts/client.close.json
@@ -25,13 +25,6 @@
"params": [],
"return": 12
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "fread",
"params": [
@@ -65,12 +58,5 @@
"@mock-stream"
],
"return":true
- },
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "Unknown"
}
]
\ No newline at end of file
diff --git a/tests/scripts/client.connect-bad-stream.json b/tests/scripts/client.connect-bad-stream.json
new file mode 100644
index 0000000..aecf0fb
--- /dev/null
+++ b/tests/scripts/client.connect-bad-stream.json
@@ -0,0 +1,26 @@
+[
+ {
+ "function": "stream_context_create",
+ "params": [],
+ "return": "@mock-stream-context"
+ },
+ {
+ "function": "stream_socket_client",
+ "params": [
+ "tcp:\/\/localhost:8000",
+ null,
+ null,
+ 5,
+ 4,
+ "@mock-stream-context"
+ ],
+ "return": "@mock-stream"
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "bad stream"
+ }
+]
\ No newline at end of file
diff --git a/tests/scripts/client.connect-handshake-error.json b/tests/scripts/client.connect-handshake-error.json
index f3aaced..3eedef6 100644
--- a/tests/scripts/client.connect-handshake-error.json
+++ b/tests/scripts/client.connect-handshake-error.json
@@ -47,6 +47,13 @@
],
"return": false
},
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "stream"
+ },
{
"function": "stream_get_meta_data",
"params": [
@@ -61,5 +68,19 @@
"unread_bytes": 0,
"seekable": false
}
+ },
+ {
+ "function": "fclose",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": true
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": ""
}
]
\ No newline at end of file
diff --git a/tests/scripts/client.connect-handshake-failure.json b/tests/scripts/client.connect-handshake-failure.json
new file mode 100644
index 0000000..80ae018
--- /dev/null
+++ b/tests/scripts/client.connect-handshake-failure.json
@@ -0,0 +1,50 @@
+[
+ {
+ "function": "stream_context_create",
+ "params": [],
+ "return": "@mock-stream-context"
+ },
+ {
+ "function": "stream_socket_client",
+ "params": [
+ "tcp:\/\/localhost:8000",
+ null,
+ null,
+ 5,
+ 4,
+ "@mock-stream-context"
+ ],
+ "return": "@mock-stream"
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "stream"
+ },
+ {
+ "function": "stream_set_timeout",
+ "params": [
+ "@mock-stream",
+ 5
+ ],
+ "return": true
+ },
+ {
+ "function": "fwrite",
+ "params": [
+ "@mock-stream"
+ ],
+ "return-op": "key-save",
+ "return": 199
+ },
+ {
+ "function": "fgets",
+ "params": [
+ "@mock-stream",
+ 1024
+ ],
+ "return": false
+ }
+]
\ No newline at end of file
diff --git a/tests/scripts/client.connect-persistent-failure.json b/tests/scripts/client.connect-persistent-failure.json
new file mode 100644
index 0000000..337377d
--- /dev/null
+++ b/tests/scripts/client.connect-persistent-failure.json
@@ -0,0 +1,34 @@
+[
+ {
+ "function": "stream_context_create",
+ "params": [],
+ "return": "@mock-stream-context"
+ },
+ {
+ "function": "stream_socket_client",
+ "params": [
+ "tcp:\/\/localhost:8000",
+ null,
+ null,
+ 5,
+ 5,
+ "@mock-stream-context"
+ ],
+ "return": "@mock-stream"
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "persistent stream"
+ },
+ {
+ "function": "ftell",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": false
+ }
+]
+
diff --git a/tests/scripts/client.connect-persistent.json b/tests/scripts/client.connect-persistent.json
index 61a7af9..7e9c154 100644
--- a/tests/scripts/client.connect-persistent.json
+++ b/tests/scripts/client.connect-persistent.json
@@ -23,13 +23,6 @@
],
"return": "persistent stream"
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "persistent stream"
- },
{
"function": "ftell",
"params": [
@@ -76,13 +69,6 @@
],
"return": "persistent stream"
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "persistent stream"
- },
{
"function": "fclose",
"params": [
diff --git a/tests/scripts/client.destruct.json b/tests/scripts/client.destruct.json
index c04755b..739e6fb 100644
--- a/tests/scripts/client.destruct.json
+++ b/tests/scripts/client.destruct.json
@@ -1,11 +1,4 @@
[
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "get_resource_type",
"params": [
diff --git a/tests/scripts/client.reconnect.json b/tests/scripts/client.reconnect.json
index 9344d55..cfd9621 100644
--- a/tests/scripts/client.reconnect.json
+++ b/tests/scripts/client.reconnect.json
@@ -4,14 +4,14 @@
"params": [
"@mock-stream"
],
- "return": "Unknown"
+ "return": "unknown"
},
{
"function": "get_resource_type",
"params": [
"@mock-stream"
],
- "return": "stream"
+ "return": "unknown"
},
{
"function": "stream_context_create",
diff --git a/tests/scripts/close-remote.json b/tests/scripts/close-remote.json
index d2eea02..42be0f5 100644
--- a/tests/scripts/close-remote.json
+++ b/tests/scripts/close-remote.json
@@ -33,13 +33,6 @@
"return-op": "chr-array",
"return": [117, 35, 170, 152, 89, 60, 128, 154, 81]
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "fwrite",
"params": [],
diff --git a/tests/scripts/ping-pong.json b/tests/scripts/ping-pong.json
index b647e91..d253f8f 100644
--- a/tests/scripts/ping-pong.json
+++ b/tests/scripts/ping-pong.json
@@ -106,13 +106,6 @@
"return-op": "chr-array",
"return": [247, 33, 169, 172, 218, 57, 224, 185, 221, 35, 167]
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "fwrite",
"params": [
diff --git a/tests/scripts/receive-client-timeout.json b/tests/scripts/receive-client-timeout.json
index 5a258b0..eca537c 100644
--- a/tests/scripts/receive-client-timeout.json
+++ b/tests/scripts/receive-client-timeout.json
@@ -33,13 +33,6 @@
],
"return": "stream"
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "fclose",
"params": [
diff --git a/tests/scripts/server.accept-failed-handshake.json b/tests/scripts/server.accept-failed-handshake.json
new file mode 100644
index 0000000..4827dad
--- /dev/null
+++ b/tests/scripts/server.accept-failed-handshake.json
@@ -0,0 +1,32 @@
+[
+ {
+ "function": "stream_socket_accept",
+ "params": [
+ "@mock-socket"
+ ],
+ "return": "@mock-stream"
+ },
+ {
+ "function": "stream_socket_get_name",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "127.0.0.1:12345"
+ },
+ {
+ "function": "stream_get_line",
+ "params": [
+ "@mock-stream",
+ 1024,
+ "\r\n"
+ ],
+ "return": false
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": ""
+ }
+]
\ No newline at end of file
diff --git a/tests/scripts/server.close.json b/tests/scripts/server.close.json
index b6e045c..dc37429 100644
--- a/tests/scripts/server.close.json
+++ b/tests/scripts/server.close.json
@@ -5,7 +5,7 @@
"params": [
"@mock-stream"
],
- "return": true
+ "return": "stream"
},
{
"function": "get_resource_type",
@@ -19,13 +19,6 @@
"params": [],
"return":12
},
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "stream"
- },
{
"function": "fread",
"params": [
@@ -59,12 +52,5 @@
"@mock-stream"
],
"return": true
- },
- {
- "function": "get_resource_type",
- "params": [
- "@mock-stream"
- ],
- "return": "Unknown"
}
]
\ No newline at end of file
diff --git a/tests/scripts/server.disconnect.json b/tests/scripts/server.disconnect.json
new file mode 100644
index 0000000..7cfd788
--- /dev/null
+++ b/tests/scripts/server.disconnect.json
@@ -0,0 +1,24 @@
+[
+
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": "stream"
+ },
+ {
+ "function": "fclose",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": true
+ },
+ {
+ "function": "get_resource_type",
+ "params": [
+ "@mock-stream"
+ ],
+ "return": ""
+ }
+]
\ No newline at end of file