Spaces:
No application file
No application file
declare(strict_types=1); | |
namespace Mautic\IntegrationsBundle\Entity; | |
use Doctrine\DBAL\ArrayParameterType; | |
use Doctrine\DBAL\Query\Expression\CompositeExpression; | |
use Mautic\CoreBundle\Entity\CommonRepository; | |
use Mautic\LeadBundle\Entity\Lead; | |
/** | |
* @extends CommonRepository<FieldChange> | |
*/ | |
class FieldChangeRepository extends CommonRepository | |
{ | |
/** | |
* Takes an object id & type and deletes all entities | |
* that match the given column names. | |
*/ | |
public function deleteEntitiesForObjectByColumnName(int $objectId, string $objectType, array $columnNames): void | |
{ | |
$qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$qb | |
->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') | |
->where( | |
$qb->expr()->and( | |
$qb->expr()->eq('object_type', ':objectType'), | |
$qb->expr()->eq('object_id', ':objectId'), | |
$qb->expr()->in('column_name', ':columnNames') | |
) | |
) | |
->setParameter('objectType', $objectType) | |
->setParameter('objectId', $objectId) | |
->setParameter('columnNames', $columnNames, ArrayParameterType::STRING) | |
->executeStatement(); | |
} | |
/** | |
* Takes an object id & type and deletes all entities that match. | |
*/ | |
public function deleteEntitiesForObject(int $objectId, string $objectType, ?string $integration = null, \DateTimeInterface $toDateTime = null): void | |
{ | |
$qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$expr = CompositeExpression::and($qb->expr()->eq('object_type', ':objectType'), $qb->expr()->eq('object_id', ':objectId')); | |
if ($integration) { | |
$expr = $expr->with( | |
$qb->expr()->eq('integration', ':integration') | |
); | |
$qb->setParameter('integration', $integration); | |
} | |
if (null !== $toDateTime) { | |
$expr = $expr->with($qb->expr()->lte('modified_at', ':toDateTime')); | |
$qb->setParameter('toDateTime', $toDateTime->format('Y-m-d H:i:s')); | |
} | |
$qb->setParameter('objectType', $objectType) | |
->setParameter('objectId', $objectId); | |
$qb | |
->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') | |
->where($expr) | |
->executeStatement(); | |
} | |
/** | |
* @param int|null $afterObjectId | |
* @param int $objectCount | |
*/ | |
public function findChangesBefore(string $integration, string $objectType, \DateTimeInterface $toDateTime, $afterObjectId = null, $objectCount = 100): array | |
{ | |
// Get a list of object IDs so that we can get complete snapshots of the objects | |
$qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$qb | |
->select('f.object_id') | |
->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') | |
->where( | |
$qb->expr()->and( | |
$qb->expr()->eq('f.integration', ':integration'), | |
$qb->expr()->eq('f.object_type', ':objectType'), | |
$qb->expr()->lte('f.modified_at', ':toDateTime') | |
) | |
); | |
if (Lead::class === $objectType) { | |
$qb->join('f', MAUTIC_TABLE_PREFIX.'leads', 'l', 'l.id = f.object_id'); | |
} | |
$qb->setParameter('integration', $integration) | |
->setParameter('objectType', $objectType) | |
->setParameter('toDateTime', $toDateTime->format('Y-m-d H:i:s')) | |
->orderBy('f.object_id') | |
->groupBy('f.object_id') | |
->setMaxResults($objectCount); | |
if ($afterObjectId) { | |
$qb->andWhere( | |
$qb->expr()->gt('f.object_id', (int) $afterObjectId) | |
); | |
} | |
$objectIds = $qb->executeQuery()->fetchFirstColumn(); | |
if (!$objectIds) { | |
return []; | |
} | |
// Get all the field changes for the requested objects | |
$qb | |
->resetQueryParts() | |
->select('*') | |
->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') | |
->where( | |
$qb->expr()->and( | |
$qb->expr()->eq('f.integration', ':integration'), | |
$qb->expr()->eq('f.object_type', ':objectType'), | |
$qb->expr()->in('f.object_id', $objectIds) | |
) | |
) | |
->setParameter('integration', $integration) | |
->setParameter('objectType', $objectType) | |
// 1. We must sort by f.object_id. Otherwise values stored in PartialObjectReportBuilder::lastProcessedTrackedId will be incorrect. | |
// 2. Newer updated fields must override older updated fields | |
->orderBy('f.object_id, f.modified_at', 'ASC'); | |
return $qb->executeQuery()->fetchAllAssociative(); | |
} | |
/** | |
* @param int $objectId | |
*/ | |
public function findChangesForObject(string $integration, string $objectType, $objectId): array | |
{ | |
// Get a list of object IDs so that we can get complete snapshots of the objects | |
$qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$qb | |
->select('*') | |
->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') | |
->where( | |
$qb->expr()->and( | |
$qb->expr()->eq('f.integration', ':integration'), | |
$qb->expr()->eq('f.object_type', ':objectType'), | |
$qb->expr()->eq('f.object_id', ':objectId') | |
) | |
) | |
->setParameter('integration', $integration) | |
->setParameter('objectType', $objectType) | |
->setParameter('objectId', (int) $objectId) | |
->orderBy('f.modified_at'); // Newer updated fields must override older updated fields | |
return $qb->executeQuery()->fetchAllAssociative(); | |
} | |
public function deleteOrphanLeadChanges(): int | |
{ | |
$totalDeleted = 0; | |
$limit = 1000; | |
$totalLimit = 100000; | |
$deletedInLastLoop = $limit; | |
while ($totalDeleted < $totalLimit && $deletedInLastLoop === $limit && $deleted = $this->doDeleteOrphanLeadChanges($limit)) { | |
$deletedInLastLoop = $deleted; | |
$totalDeleted += $deleted; | |
} | |
return $totalDeleted; | |
} | |
private function doDeleteOrphanLeadChanges(int $limit): int | |
{ | |
$qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$qb->select('f.id') | |
->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') | |
->leftJoin('f', MAUTIC_TABLE_PREFIX.'leads', 'l', 'l.id = f.object_id') | |
->where( | |
$qb->expr()->and( | |
$qb->expr()->eq('object_type', ':objectType'), | |
$qb->expr()->isNull('l.id') | |
) | |
) | |
->setMaxResults($limit) | |
->setParameter('objectType', Lead::class); | |
$objectIds = $qb->executeQuery()->fetchFirstColumn(); | |
if (!$objectIds) { | |
return 0; | |
} | |
$qb2 = $this->getEntityManager()->getConnection()->createQueryBuilder(); | |
$qb2->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') | |
->where( | |
$qb2->expr()->in('id', ':ids') | |
) | |
->setParameter('ids', $objectIds, ArrayParameterType::INTEGER); | |
return $qb2->executeStatement(); | |
} | |
} | |