Spaces:
No application file
No application file
File size: 2,014 Bytes
d2897cd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
<?php
declare(strict_types=1);
namespace Mautic\CoreBundle\Service;
use Symfony\Component\Process\Process;
final class ProcessQueue
{
/**
* @var \SplQueue<Process>
*/
private \SplQueue $pending;
/**
* @var \SplObjectStorage<Process,Process>
*/
private \SplObjectStorage $processing;
/**
* @var \SplObjectStorage<Process,Process>
*/
private \SplObjectStorage $processed;
public function __construct(
private int $processLimit = 10
) {
$this->pending = new \SplQueue();
$this->processing = new \SplObjectStorage();
$this->processed = new \SplObjectStorage();
}
/**
* @param Process<mixed> $process
*/
public function enqueue(Process $process): void
{
$this->pending->enqueue($process);
}
public function refresh(): void
{
// Remove finished processes from the processing queue
foreach ($this->processing as $process) {
if ($process->isRunning()) {
continue;
}
$this->processing->detach($process);
$this->processed->attach($process);
}
// Add new processes to the processing queue
for ($i = $this->processing->count(); $i < $this->processLimit; ++$i) {
if ($this->pending->isEmpty()) {
break;
}
$process = $this->pending->dequeue();
$process->start();
$this->processing->attach($process);
}
}
public function isProcessing(): bool
{
return $this->getProcessingCount() > 0;
}
public function getProcessedCount(): int
{
return $this->processed->count();
}
public function getProcessingCount(): int
{
return $this->processing->count();
}
/**
* @return \SplObjectStorage<Process,Process>
*/
public function getProcessed(): \SplObjectStorage
{
return $this->processed;
}
}
|