File size: 4,288 Bytes
d2897cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
<?php

namespace Mautic\WebhookBundle\Command;

use Mautic\CoreBundle\Helper\CoreParametersHelper;
use Mautic\WebhookBundle\Model\WebhookModel;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
 * CLI Command to process queued webhook payloads.
 */
class ProcessWebhookQueuesCommand extends Command
{
    public const COMMAND_NAME = 'mautic:webhooks:process';

    public function __construct(
        private CoreParametersHelper $coreParametersHelper,
        private WebhookModel $webhookModel
    ) {
        parent::__construct();
    }

    protected function configure()
    {
        $this->setName(self::COMMAND_NAME)
            ->addOption(
                '--webhook-id',
                '-i',
                InputOption::VALUE_OPTIONAL,
                'Process payload for a specific webhook.  If not specified, all webhooks will be processed.',
                null
            )
            ->addOption(
                '--min-id',
                null,
                InputOption::VALUE_OPTIONAL,
                'Sets the minimum webhook queue ID to process (so called range mode).',
                null
            )
            ->addOption(
                '--max-id',
                null,
                InputOption::VALUE_OPTIONAL,
                'Sets the maximum webhook queue ID to process (so called range mode).',
                null
            );
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        // check to make sure we are in queue mode
        if ($this->coreParametersHelper->get('queue_mode') != $this->webhookModel::COMMAND_PROCESS) {
            $output->writeLn('Webhook Bundle is in immediate process mode. To use the command function change to command mode.');

            return Command::SUCCESS;
        }

        $id    = $input->getOption('webhook-id');
        $minId = (int) $input->getOption('min-id');
        $maxId = (int) $input->getOption('max-id');

        if ($id) {
            $webhook        = $this->webhookModel->getEntity($id);
            $webhooks       = (null !== $webhook && $webhook->isPublished()) ? [$id => $webhook] : [];
            $queueRangeMode = $minId && $maxId;
        } else {
            // make sure we only get published webhook entities
            $webhooks = $this->webhookModel->getEntities(
                [
                    'filter' => [
                        'force' => [
                            [
                                'column' => 'e.isPublished',
                                'expr'   => 'eq',
                                'value'  => 1,
                            ],
                        ],
                    ],
                ]
            );
        }

        if (!count($webhooks)) {
            $output->writeln('<error>No published webhooks found. Try again later.</error>');

            return Command::FAILURE;
        }

        $output->writeLn('<info>Processing Webhooks</info>');

        try {
            if ($queueRangeMode) {
                $webhookLimit = $this->webhookModel->getWebhookLimit();

                if (1 > $webhookLimit) {
                    throw new \InvalidArgumentException('`webhook limit` parameter must be greater than zero.');
                }

                for (; $minId <= $maxId; $minId += $webhookLimit) {
                    $this->webhookModel
                        ->setMinQueueId($minId)
                        ->setMaxQueueId(min($minId + $webhookLimit - 1, $maxId));

                    $this->webhookModel->processWebhook(current($webhooks));
                }
            } else {
                $this->webhookModel->processWebhooks($webhooks);
            }
        } catch (\Exception $e) {
            $output->writeLn('<error>'.$e->getMessage().'</error>');
            $output->writeLn('<error>'.$e->getTraceAsString().'</error>');

            return Command::FAILURE;
        }

        $output->writeLn('<info>Webhook Processing Complete</info>');

        return Command::SUCCESS;
    }

    protected static $defaultDescription = 'Process queued webhook payloads';
}