Skip to content

Commit

Permalink
Fix issue with chunked reading from process and extract it to separat…
Browse files Browse the repository at this point in the history
…e class
  • Loading branch information
donhardman committed Dec 17, 2024
1 parent 7f6b694 commit 2cae3fc
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/Process/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Manticoresearch\Buddy\Core\Tool\Strings;
use Swoole\Process as SwooleProcess;

/** @package Manticoresearch\Buddy\Core\Process */
final class Process {
/** @var array<string,Worker> $workers */
protected array $workers = [];
Expand Down Expand Up @@ -141,13 +142,18 @@ static function (SwooleProcess $worker) use ($processor) {
$name = Buddy::getProcessName(Strings::classNameToIdentifier($processor::class));
swoole_set_process_name($name);
chdir(sys_get_temp_dir());
while ($msg = $worker->read()) {
/** @var string $msg */
$fn = $processor->parseMessage($msg);
if (!$fn) {
continue;
/** @phpstan-ignore-next-line */
while (true) {
$reader = ProcessReader::read($worker);
foreach ($reader as $msg) {
/** @var string $msg */
$fn = $processor->parseMessage($msg);
if (!$fn) {
continue;
}
$fn();
}
$fn();
usleep(100);
}
}, true, 2
);
Expand Down Expand Up @@ -184,7 +190,8 @@ public function stop(): static {
*/
public function execute(string $method, array $args = []): static {
Buddy::debugv("[process] execute: $method " . json_encode($args));
$this->process->write(serialize([$method, $args]));
$message = ProcessReader::packMessage([$method, $args]);
$this->process->write($message);
return $this;
}
}
83 changes: 83 additions & 0 deletions src/Process/ProcessReader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php declare(strict_types=1);

namespace Manticoresearch\Buddy\Core\Process;

use Generator;
use RuntimeException;
use Swoole\Process as SwooleProcess;

/** @package */
final class ProcessReader {
/**
* Serialize and pack message
* @param array<mixed> $message
* @return string
*/
public static function packMessage(array $message): string {
$serialized = serialize($message);
return pack('N', strlen($serialized)) . $serialized;
}

/**
* Unpack the message and returns message to unserialize and rest of it in case have we have incomplete
* @param string $message
* @return array{0:string,1:string}
* @throws RuntimeException
*/
public static function unpackMessage(string $message): array {
$length = static::getMessageLen($message);
$chunk = substr($message, 4, $length);
return [$chunk, substr($message, $length + 4)];
}

/**
* Validate if the message is complete or not
* cuz sometimes Swoole may send it in the different packages
* @param string $message
* @return bool
*/
public static function isMessageComplete(string $message): bool {
$length = static::getMessageLen($message);
return strlen($message) >= $length + 4;
}

/**
* @param string $message
* @return int
* @throws RuntimeException
*/
protected static function getMessageLen(string $message): int {
$unpacked = unpack('N', $message);
if (!$unpacked) {
throw new RuntimeException('Failed to unpack message');
}
return $unpacked[1];
}

/**
* Wrapper to read chunked message from the worker
* @param SwooleProcess $worker
* @return Generator<string>
*/
public static function read(SwooleProcess $worker): Generator {
$buffer = '';
$chunk = $worker->read();

if ($chunk) {
$buffer .= $chunk;

while ($buffer) {
if (!static::isMessageComplete($buffer)) {
break;
}

[$current, $remaining] = static::unpackMessage($buffer);
yield $current;

$buffer = $remaining;
}
}

return $buffer;
}
}

0 comments on commit 2cae3fc

Please sign in to comment.