Skip to content

Commit

Permalink
Refactor MQTT client loop implementation to allow easier integration …
Browse files Browse the repository at this point in the history
…with third party event loops (#115)

* Refactor MQTT client loop implementation

This refactoring add a loopOnce method to MqttClient class. This method allows the MQTT client implementation to be integrated to an event loop (like ReactPHP - Ratchet).

* Update loopOnce documentation

* Add method to interface, refactor and reformat code

Co-authored-by: Marvin Mall <[email protected]>
  • Loading branch information
Alex6092 and Namoshek authored Jun 15, 2022
1 parent 77f6854 commit d543813
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
22 changes: 22 additions & 0 deletions src/Contracts/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\Exceptions\InvalidMessageException;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Exceptions\RepositoryException;
Expand Down Expand Up @@ -143,11 +144,32 @@ public function interrupt(): void;
* @param int|null $queueWaitLimit
* @return void
* @throws DataTransferException
* @throws InvalidMessageException
* @throws MqttClientException
* @throws ProtocolViolationException
*/
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, int $queueWaitLimit = null): void;

/**
* Runs an event loop iteration that handles messages from the server and calls the registered
* callbacks for published messages. Also resends pending messages and calls loop event handlers.
*
* This method can be used to integrate the MQTT client in another event loop (like ReactPHP or Ratchet).
*
* Note: To ensure the event handlers called by this method will receive the correct elapsed time,
* the caller is responsible to provide the correct starting time of the loop as returned by `microtime(true)`.
*
* @param float $loopStartedAt
* @param bool $allowSleep
* @param int $sleepMicroseconds
* @return void
* @throws DataTransferException
* @throws InvalidMessageException
* @throws MqttClientException
* @throws ProtocolViolationException
*/
public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sleepMicroseconds = 100000): void;

/**
* Returns the host used by the client to connect to.
*
Expand Down
54 changes: 31 additions & 23 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -641,29 +641,7 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false,
break;
}

$elapsedTime = microtime(true) - $loopStartedAt;
$this->runLoopEventHandlers($elapsedTime);

// Read data from the socket - as much as available.
$this->buffer .= $this->readAllAvailableDataFromSocket();

// Try to parse a message from the buffer and handle it, as long as messages can be parsed.
if (strlen($this->buffer) > 0) {
$this->processMessageBuffer();
} elseif ($allowSleep) {
usleep(100000); // 100ms
}

// Republish messages expired without confirmation.
// This includes published messages, subscribe and unsubscribe requests.
$this->resendPendingMessages();

// If the last message of the broker has been received more seconds ago
// than specified by the keep alive time, we will send a ping to ensure
// the connection is kept alive.
if ($this->nextPingAt() <= microtime(true)) {
$this->ping();
}
$this->loopOnce($loopStartedAt, $allowSleep);

// If configured, the loop is exited if all queues are empty or a certain time limit is reached (i.e. retry is aborted).
// In any case, there may not be any active subscriptions though.
Expand All @@ -681,6 +659,36 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false,
}
}

/**
* {@inheritDoc}
*/
public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sleepMicroseconds = 100000): void
{
$elapsedTime = microtime(true) - $loopStartedAt;
$this->runLoopEventHandlers($elapsedTime);

// Read data from the socket - as much as available.
$this->buffer .= $this->readAllAvailableDataFromSocket();

// Try to parse a message from the buffer and handle it, as long as messages can be parsed.
if (strlen($this->buffer) > 0) {
$this->processMessageBuffer();
} elseif ($allowSleep) {
usleep($sleepMicroseconds);
}

// Republish messages expired without confirmation.
// This includes published messages, subscribe and unsubscribe requests.
$this->resendPendingMessages();

// If the last message of the broker has been received more seconds ago
// than specified by the keep alive time, we will send a ping to ensure
// the connection is kept alive.
if ($this->nextPingAt() <= microtime(true)) {
$this->ping();
}
}

/**
* Processes the incoming message buffer by parsing and handling the messages, until the buffer is empty.
*
Expand Down

0 comments on commit d543813

Please sign in to comment.