chrisbryan17's picture
Upload folder using huggingface_hub
d2897cd verified
raw
history blame contribute delete
21.2 kB
<?php
namespace Mautic\WebhookBundle\Model;
use Doctrine\Common\Collections\Criteria;
use Doctrine\ORM\EntityManager;
use JMS\Serializer\SerializationContext;
use JMS\Serializer\SerializerInterface;
use Mautic\ApiBundle\Serializer\Exclusion\PublishDetailsExclusionStrategy;
use Mautic\CoreBundle\Helper\CoreParametersHelper;
use Mautic\CoreBundle\Helper\EncryptionHelper;
use Mautic\CoreBundle\Helper\UserHelper;
use Mautic\CoreBundle\Model\FormModel;
use Mautic\CoreBundle\Security\Permissions\CorePermissions;
use Mautic\CoreBundle\Translation\Translator;
use Mautic\WebhookBundle\Entity\Event;
use Mautic\WebhookBundle\Entity\EventRepository;
use Mautic\WebhookBundle\Entity\Log;
use Mautic\WebhookBundle\Entity\LogRepository;
use Mautic\WebhookBundle\Entity\Webhook;
use Mautic\WebhookBundle\Entity\WebhookQueue;
use Mautic\WebhookBundle\Entity\WebhookQueueRepository;
use Mautic\WebhookBundle\Entity\WebhookRepository;
use Mautic\WebhookBundle\Event as Events;
use Mautic\WebhookBundle\Event\WebhookEvent;
use Mautic\WebhookBundle\Form\Type\WebhookType;
use Mautic\WebhookBundle\Http\Client;
use Mautic\WebhookBundle\WebhookEvents;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Form\FormFactoryInterface;
use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
use Symfony\Contracts\EventDispatcher\Event as SymfonyEvent;
/**
* @extends FormModel<Webhook>
*/
class WebhookModel extends FormModel
{
/**
* 2 possible types of the processing of the webhooks.
*/
public const COMMAND_PROCESS = 'command_process';
public const IMMEDIATE_PROCESS = 'immediate_process';
private const DELETE_BATCH_LIMIT = 5000;
public const WEBHOOK_LOG_MAX = 1000;
/**
* Whet queue mode is turned on.
*
* @var string
*/
protected $queueMode;
/**
* How many entities to add into one queued webhook.
*
* @var int
*/
protected $webhookLimit;
/**
* Sets min webhook queue ID to get/process.
*/
protected ?int $minQueueId = null;
/**
* Sets max webhook queue ID to get/process.
*/
protected ?int $maxQueueId = null;
/**
* How long the webhook processing can run in seconds.
*/
private int $webhookTimeLimit;
/**
* How many responses in 1 row can fail until the webhook disables itself.
*
* @var int
*/
protected $disableLimit;
/**
* How many seconds will we wait for the response.
*
* @var int in seconds
*/
protected $webhookTimeout;
/**
* The key is queue ID, the value is the WebhookQueue object.
*
* @var array
*/
protected $webhookQueueIdList = [];
/**
* How many recent log records should be kept.
*
* @var int
*/
protected $logMax;
/**
* Queued events default order by dir
* Possible values: ['ASC', 'DESC'].
*
* @var string
*/
protected $eventsOrderByDir;
/**
* Timestamp when the webhook processing starts.
*/
private string|float|null $startTime = null;
public function __construct(
CoreParametersHelper $coreParametersHelper,
protected SerializerInterface $serializer,
private Client $httpClient,
EntityManager $em,
CorePermissions $security,
EventDispatcherInterface $dispatcher,
UrlGeneratorInterface $router,
Translator $translator,
UserHelper $userHelper,
LoggerInterface $mauticLogger
) {
$this->setConfigProps($coreParametersHelper);
parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $coreParametersHelper);
}
/**
* @param Webhook $entity
*/
public function saveEntity($entity, $unlock = true): void
{
if (null === $entity->getSecret()) {
$entity->setSecret(EncryptionHelper::generateKey());
}
parent::saveEntity($entity, $unlock);
}
/**
* @param Webhook $entity
* @param array<mixed> $options
*
* @throws \Symfony\Component\HttpKernel\Exception\NotFoundHttpException
*/
public function createForm($entity, FormFactoryInterface $formFactory, $action = null, $options = []): \Symfony\Component\Form\FormInterface
{
if (!$entity instanceof Webhook) {
throw new MethodNotAllowedHttpException(['Webhook']);
}
if (!empty($action)) {
$options['action'] = $action;
}
$options['events'] = $this->getEvents();
return $formFactory->create(WebhookType::class, $entity, $options);
}
public function getEntity($id = null): ?Webhook
{
if (null === $id) {
return new Webhook();
}
return parent::getEntity($id);
}
/**
* @return WebhookRepository
*/
public function getRepository()
{
return $this->em->getRepository(Webhook::class);
}
/**
* Gets array of custom events from bundles subscribed MauticWehbhookBundle::WEBHOOK_ON_BUILD.
*
* @return mixed
*/
public function getEvents()
{
static $events;
if (empty($events)) {
// build them
$events = [];
$event = new Events\WebhookBuilderEvent($this->translator);
$this->dispatcher->dispatch($event, WebhookEvents::WEBHOOK_ON_BUILD);
$events = $event->getEvents();
}
return $events;
}
/**
* Get a list of webhooks by matching events.
*
* @param string $type string of event type
*
* @return array
*/
public function getEventWebooksByType($type)
{
return $this->getEventRepository()->getEntitiesByEventType($type);
}
public function queueWebhooksByType($type, $payload, array $groups = []): void
{
$this->queueWebhooks(
$this->getEventWebooksByType($type),
$payload,
$groups
);
}
public function queueWebhooks($webhookEvents, $payload, array $serializationGroups = []): void
{
if (!count($webhookEvents) || !is_array($webhookEvents)) {
return;
}
/** @var Event $event */
foreach ($webhookEvents as $event) {
$webhook = $event->getWebhook();
$queue = $this->queueWebhook($webhook, $event, $payload, $serializationGroups);
if (self::COMMAND_PROCESS === $this->queueMode) {
// Queue to the database to process later
$this->getQueueRepository()->saveEntity($queue);
} else {
// Immediately process
$this->processWebhook($webhook, $queue);
}
}
}
/**
* Creates a WebhookQueue entity, sets the date and returns the created entity.
*/
public function queueWebhook(Webhook $webhook, $event, $payload, array $serializationGroups = []): WebhookQueue
{
$serializedPayload = $this->serializeData($payload, $serializationGroups);
$queue = new WebhookQueue();
$queue->setWebhook($webhook);
$queue->setDateAdded(new \DateTime());
$queue->setEvent($event);
$queue->setPayload($serializedPayload);
// fire events for when the queues are created
if ($this->dispatcher->hasListeners(WebhookEvents::WEBHOOK_QUEUE_ON_ADD)) {
$webhookQueueEvent = $event = new Events\WebhookQueueEvent($queue, $webhook, true);
$this->dispatcher->dispatch($webhookQueueEvent, WebhookEvents::WEBHOOK_QUEUE_ON_ADD);
}
return $queue;
}
/**
* Execute a list of webhooks to their specified endpoints.
*
* @param array|\Doctrine\ORM\Tools\Pagination\Paginator $webhooks
*/
public function processWebhooks($webhooks): void
{
$this->startTime = microtime(true);
foreach ($webhooks as $webhook) {
$this->processWebhook($webhook);
}
}
public function processWebhook(Webhook $webhook, WebhookQueue $queue = null): bool
{
// get the webhook payload
$payload = $this->getWebhookPayload($webhook, $queue);
// if there wasn't a payload we can stop here
if (empty($payload)) {
return false;
}
$start = microtime(true);
$webhookQueueRepo = $this->getQueueRepository();
try {
$response = $this->httpClient->post($webhook->getWebhookUrl(), $payload, $webhook->getSecret());
// remove successfully processed queues from the Webhook object so they won't get stored again
$queueIds = array_keys($this->webhookQueueIdList);
$chunkedQueueIds = array_chunk($queueIds, self::DELETE_BATCH_LIMIT);
foreach ($chunkedQueueIds as $queueIds) {
$webhookQueueRepo->deleteQueuesById($queueIds);
}
$responseBody = $response->getBody()->getContents();
if (!$responseBody) {
$responseBody = null; // Save null value to database
}
$responseStatusCode = $response->getStatusCode();
$this->addLog($webhook, $responseStatusCode, microtime(true) - $start, $responseBody);
// throw an error exception if we don't get a 200 back
if ($responseStatusCode >= 300 || $responseStatusCode < 200) {
// The receiver of the webhook is telling us to stop bothering him with our requests by code 410
if (410 === $responseStatusCode) {
$this->killWebhook($webhook, 'mautic.webhook.stopped.reason.410');
}
throw new \ErrorException($webhook->getWebhookUrl().' returned '.$responseStatusCode.' with payload: '.json_encode($payload));
}
} catch (\Exception $e) {
$message = $e->getMessage();
if ($this->isSick($webhook)) {
$this->killWebhook($webhook);
$message .= ' '.$this->translator->trans('mautic.webhook.killed', ['%limit%' => $this->disableLimit]);
}
// log any errors but allow the script to keep running
$this->logger->error($message);
// log that the request failed to display it to the user
$this->addLog($webhook, 'N/A', microtime(true) - $start, $message);
return false;
}
// Run this on command as well as immediate send because if switched from queue to immediate
// it can have some rows in the queue which will be send in every webhook forever
if (!empty($this->webhookQueueIdList)) {
// delete all the queued items we just processed
$webhookQueueRepo->deleteQueuesById(array_keys($this->webhookQueueIdList));
$nextWebhookExists = $webhookQueueRepo->exists($webhook->getId());
// reset the array to blank so none of the IDs are repeated
$this->webhookQueueIdList = [];
// if there are still items in the queue after processing we re-process
// WARNING: this is recursive
if ($nextWebhookExists && !$this->isProcessingExpired()) {
$this->processWebhook($webhook);
}
}
return true;
}
/**
* Look into the history and check if all the responses we care about had failed.
* But let it run for a while after the user modified it. Lets not aggravate the user.
*/
public function isSick(Webhook $webhook): bool
{
// Do not mess with the user will! (at least not now)
if ($webhook->wasModifiedRecently()) {
return false;
}
$successRadio = $this->getLogRepository()->getSuccessVsErrorStatusCodeRatio($webhook->getId(), $this->disableLimit);
// If there are no log rows yet, consider it healthy
if (null === $successRadio) {
return false;
}
return !$successRadio;
}
/**
* Unpublish the webhook so it will stop emit the requests
* and notify user about it.
*
* @param string $reason
*/
public function killWebhook(Webhook $webhook, $reason = 'mautic.webhook.stopped.reason'): void
{
$webhook->setIsPublished(false);
$this->saveEntity($webhook);
$event = new WebhookEvent($webhook, false, $reason);
$this->dispatcher->dispatch($event, WebhookEvents::WEBHOOK_KILL);
}
/**
* Add a log for the webhook response HTTP status and save it.
*
* @param int $statusCode
* @param float $runtime in seconds
* @param string $note
* $runtime variable unit is in seconds
*/
public function addLog(Webhook $webhook, $statusCode, $runtime, $note = null): void
{
if (!$webhook->getId()) {
return;
}
if (!$this->coreParametersHelper->get('clean_webhook_logs_in_background')) {
$this->getLogRepository()->removeLimitExceedLogs($webhook->getId(), $this->logMax);
}
$log = new Log();
$log->setWebhook($webhook);
$log->setNote($note);
$log->setRuntime($runtime);
$log->setStatusCode($statusCode);
$log->setDateAdded(new \DateTime());
$webhook->addLog($log);
$this->saveEntity($webhook);
}
/**
* @return WebhookQueueRepository
*/
public function getQueueRepository()
{
return $this->em->getRepository(WebhookQueue::class);
}
/**
* @return EventRepository
*/
public function getEventRepository()
{
return $this->em->getRepository(Event::class);
}
/**
* @return LogRepository
*/
public function getLogRepository()
{
return $this->em->getRepository(Log::class);
}
/**
* Get the payload from the webhook.
*
* @return array
*/
public function getWebhookPayload(Webhook $webhook, WebhookQueue $queue = null)
{
if ($payload = $webhook->getPayload()) {
return $payload;
}
$payload = [];
if (self::COMMAND_PROCESS === $this->queueMode) {
$queuesArray = $this->getWebhookQueues($webhook);
} else {
$queuesArray = null !== $queue ? [$queue] : [];
}
/* @var WebhookQueue $queueItem */
foreach ($queuesArray as $queueItem) {
/** @var Event $event */
$event = $queueItem->getEvent();
$type = $event->getEventType();
// create new array level for each unique event type
if (!isset($payload[$type])) {
$payload[$type] = [];
}
$queuePayload = json_decode($queueItem->getPayload(), true);
$queuePayload['timestamp'] = $queueItem->getDateAdded()->format('c');
// its important to decode the payload form the DB as we re-encode it with the
$payload[$type][] = $queuePayload;
// Add to the webhookQueueIdList only if ID exists.
// That means if it was stored to DB and not sent via immediate send.
if ($queueItem->getId()) {
$this->webhookQueueIdList[$queueItem->getId()] = $queueItem;
// Clear the WebhookQueue entity from memory
$this->em->detach($queueItem);
}
}
return $payload;
}
/**
* Get the queues and order by date so we get events.
*
* @return iterable<object>
*/
public function getWebhookQueues(Webhook $webhook)
{
/** @var WebhookQueueRepository $queueRepo */
$queueRepo = $this->getQueueRepository();
$parameters = [
'iterable_mode' => true,
'start' => 0,
'limit' => $this->webhookLimit,
'orderBy' => $queueRepo->getTableAlias().'.id',
'orderByDir' => $this->getEventsOrderbyDir($webhook),
'filter' => [
'force' => [
[
'column' => 'IDENTITY('.$queueRepo->getTableAlias().'.webhook)',
'expr' => 'eq',
'value' => $webhook->getId(),
],
],
],
];
if ($this->minQueueId && $this->maxQueueId) {
unset($parameters['start']);
unset($parameters['limit']);
$parameters['filter']['force'][] = [
'column' => $queueRepo->getTableAlias().'.id',
'expr' => 'gte',
'value' => $this->minQueueId,
];
$parameters['filter']['force'][] = [
'column' => $queueRepo->getTableAlias().'.id',
'expr' => 'lte',
'value' => $this->maxQueueId,
];
}
return $queueRepo->getEntities($parameters);
}
/**
* Returns either Webhook's orderbyDir or the value from configuration as default.
*
* @return string
*/
public function getEventsOrderbyDir(Webhook $webhook = null)
{
// Try to get the value from Webhook
if ($webhook && $orderByDir = $webhook->getEventsOrderbyDir()) {
return $orderByDir;
}
// Use the global config value if it's not set in the Webhook
return $this->eventsOrderByDir;
}
/**
* @throws MethodNotAllowedHttpException
*/
protected function dispatchEvent($action, &$entity, $isNew = false, SymfonyEvent $event = null): ?SymfonyEvent
{
if (!$entity instanceof Webhook) {
throw new MethodNotAllowedHttpException(['Webhook'], 'Entity must be of class Webhook()');
}
switch ($action) {
case 'pre_save':
$name = WebhookEvents::WEBHOOK_PRE_SAVE;
break;
case 'post_save':
$name = WebhookEvents::WEBHOOK_POST_SAVE;
break;
case 'pre_delete':
$name = WebhookEvents::WEBHOOK_PRE_DELETE;
break;
case 'post_delete':
$name = WebhookEvents::WEBHOOK_POST_DELETE;
break;
default:
return null;
}
if ($this->dispatcher->hasListeners($name)) {
if (empty($event)) {
$event = new WebhookEvent($entity, $isNew);
$event->setEntityManager($this->em);
}
$this->dispatcher->dispatch($event, $name);
return $event;
} else {
return null;
}
}
/**
* @param array $groups
*/
public function serializeData($payload, $groups = [], array $customExclusionStrategies = []): string
{
$context = SerializationContext::create();
if (!empty($groups)) {
$context->setGroups($groups);
}
// Only include FormEntity properties for the top level entity and not the associated entities
$context->addExclusionStrategy(
new PublishDetailsExclusionStrategy()
);
foreach ($customExclusionStrategies as $exclusionStrategy) {
$context->addExclusionStrategy($exclusionStrategy);
}
// include null values
$context->setSerializeNull(true);
// serialize the data and send it as a payload
return $this->serializer->serialize($payload, 'json', $context);
}
public function getPermissionBase(): string
{
return 'webhook:webhooks';
}
public function getWebhookLimit(): int
{
return $this->webhookLimit;
}
public function setMinQueueId(int $minQueueId): self
{
$this->minQueueId = $minQueueId;
return $this;
}
public function setMaxQueueId(int $maxQueueId): self
{
$this->maxQueueId = $maxQueueId;
return $this;
}
private function isProcessingExpired(): bool
{
$currentTime = microtime(true);
$runTime = $currentTime - $this->startTime;
return $runTime >= $this->webhookTimeLimit;
}
/**
* Sets all class properties from CoreParametersHelper.
*/
private function setConfigProps(CoreParametersHelper $coreParametersHelper): void
{
$this->webhookLimit = (int) $coreParametersHelper->get('webhook_limit', 10);
$this->webhookTimeLimit = (int) $coreParametersHelper->get('webhook_time_limit', 600);
$this->disableLimit = (int) $coreParametersHelper->get('webhook_disable_limit', 100);
$this->webhookTimeout = (int) $coreParametersHelper->get('webhook_timeout', 15);
$this->logMax = (int) $coreParametersHelper->get('webhook_log_max', self::WEBHOOK_LOG_MAX);
$this->queueMode = $coreParametersHelper->get('queue_mode');
$this->eventsOrderByDir = $coreParametersHelper->get('events_orderby_dir', Criteria::ASC);
}
}