Spaces:
No application file
No application file
File size: 4,288 Bytes
d2897cd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
<?php
namespace Mautic\WebhookBundle\Command;
use Mautic\CoreBundle\Helper\CoreParametersHelper;
use Mautic\WebhookBundle\Model\WebhookModel;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* CLI Command to process queued webhook payloads.
*/
class ProcessWebhookQueuesCommand extends Command
{
public const COMMAND_NAME = 'mautic:webhooks:process';
public function __construct(
private CoreParametersHelper $coreParametersHelper,
private WebhookModel $webhookModel
) {
parent::__construct();
}
protected function configure()
{
$this->setName(self::COMMAND_NAME)
->addOption(
'--webhook-id',
'-i',
InputOption::VALUE_OPTIONAL,
'Process payload for a specific webhook. If not specified, all webhooks will be processed.',
null
)
->addOption(
'--min-id',
null,
InputOption::VALUE_OPTIONAL,
'Sets the minimum webhook queue ID to process (so called range mode).',
null
)
->addOption(
'--max-id',
null,
InputOption::VALUE_OPTIONAL,
'Sets the maximum webhook queue ID to process (so called range mode).',
null
);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
// check to make sure we are in queue mode
if ($this->coreParametersHelper->get('queue_mode') != $this->webhookModel::COMMAND_PROCESS) {
$output->writeLn('Webhook Bundle is in immediate process mode. To use the command function change to command mode.');
return Command::SUCCESS;
}
$id = $input->getOption('webhook-id');
$minId = (int) $input->getOption('min-id');
$maxId = (int) $input->getOption('max-id');
if ($id) {
$webhook = $this->webhookModel->getEntity($id);
$webhooks = (null !== $webhook && $webhook->isPublished()) ? [$id => $webhook] : [];
$queueRangeMode = $minId && $maxId;
} else {
// make sure we only get published webhook entities
$webhooks = $this->webhookModel->getEntities(
[
'filter' => [
'force' => [
[
'column' => 'e.isPublished',
'expr' => 'eq',
'value' => 1,
],
],
],
]
);
}
if (!count($webhooks)) {
$output->writeln('<error>No published webhooks found. Try again later.</error>');
return Command::FAILURE;
}
$output->writeLn('<info>Processing Webhooks</info>');
try {
if ($queueRangeMode) {
$webhookLimit = $this->webhookModel->getWebhookLimit();
if (1 > $webhookLimit) {
throw new \InvalidArgumentException('`webhook limit` parameter must be greater than zero.');
}
for (; $minId <= $maxId; $minId += $webhookLimit) {
$this->webhookModel
->setMinQueueId($minId)
->setMaxQueueId(min($minId + $webhookLimit - 1, $maxId));
$this->webhookModel->processWebhook(current($webhooks));
}
} else {
$this->webhookModel->processWebhooks($webhooks);
}
} catch (\Exception $e) {
$output->writeLn('<error>'.$e->getMessage().'</error>');
$output->writeLn('<error>'.$e->getTraceAsString().'</error>');
return Command::FAILURE;
}
$output->writeLn('<info>Webhook Processing Complete</info>');
return Command::SUCCESS;
}
protected static $defaultDescription = 'Process queued webhook payloads';
}
|