Spaces:
No application file
No application file
declare(strict_types=1); | |
namespace Mautic\CoreBundle\Service; | |
use Symfony\Component\Process\Process; | |
final class ProcessQueue | |
{ | |
/** | |
* @var \SplQueue<Process> | |
*/ | |
private \SplQueue $pending; | |
/** | |
* @var \SplObjectStorage<Process,Process> | |
*/ | |
private \SplObjectStorage $processing; | |
/** | |
* @var \SplObjectStorage<Process,Process> | |
*/ | |
private \SplObjectStorage $processed; | |
public function __construct( | |
private int $processLimit = 10 | |
) { | |
$this->pending = new \SplQueue(); | |
$this->processing = new \SplObjectStorage(); | |
$this->processed = new \SplObjectStorage(); | |
} | |
/** | |
* @param Process<mixed> $process | |
*/ | |
public function enqueue(Process $process): void | |
{ | |
$this->pending->enqueue($process); | |
} | |
public function refresh(): void | |
{ | |
// Remove finished processes from the processing queue | |
foreach ($this->processing as $process) { | |
if ($process->isRunning()) { | |
continue; | |
} | |
$this->processing->detach($process); | |
$this->processed->attach($process); | |
} | |
// Add new processes to the processing queue | |
for ($i = $this->processing->count(); $i < $this->processLimit; ++$i) { | |
if ($this->pending->isEmpty()) { | |
break; | |
} | |
$process = $this->pending->dequeue(); | |
$process->start(); | |
$this->processing->attach($process); | |
} | |
} | |
public function isProcessing(): bool | |
{ | |
return $this->getProcessingCount() > 0; | |
} | |
public function getProcessedCount(): int | |
{ | |
return $this->processed->count(); | |
} | |
public function getProcessingCount(): int | |
{ | |
return $this->processing->count(); | |
} | |
/** | |
* @return \SplObjectStorage<Process,Process> | |
*/ | |
public function getProcessed(): \SplObjectStorage | |
{ | |
return $this->processed; | |
} | |
} | |