8889841cPK1[ONqq&ScheduledTaskSubscribersRepository.phpnu[ */ class ScheduledTaskSubscribersRepository extends Repository { protected function getEntityClassName() { return ScheduledTaskSubscriberEntity::class; } public function isSubscriberProcessed(ScheduledTaskEntity $task, SubscriberEntity $subscriber): bool { $scheduledTaskSubscriber = $this ->doctrineRepository ->createQueryBuilder('sts') ->andWhere('sts.processed = 1') ->andWhere('sts.task = :task') ->andWhere('sts.subscriber = :subscriber') ->setParameter('subscriber', $subscriber) ->setParameter('task', $task) ->getQuery() ->getOneOrNullResult(); return !empty($scheduledTaskSubscriber); } public function createOrUpdate(array $data): ?ScheduledTaskSubscriberEntity { if (!isset($data['task_id'], $data['subscriber_id'])) { return null; } $taskSubscriber = $this->findOneBy(['task' => $data['task_id'], 'subscriber' => $data['subscriber_id']]); if (!$taskSubscriber) { $task = $this->entityManager->getReference(ScheduledTaskEntity::class, (int)$data['task_id']); $subscriber = $this->entityManager->getReference(SubscriberEntity::class, (int)$data['subscriber_id']); if (!$task || !$subscriber) throw new InvalidStateException(); $taskSubscriber = new ScheduledTaskSubscriberEntity($task, $subscriber); $this->persist($taskSubscriber); } $processed = $data['processed'] ?? ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED; $failed = $data['failed'] ?? ScheduledTaskSubscriberEntity::FAIL_STATUS_OK; $taskSubscriber->setProcessed($processed); $taskSubscriber->setFailed($failed); $this->flush(); return $taskSubscriber; } public function countSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId): int { $queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId); $countSubscribers = $queryBuilder ->select('count(sts.subscriber)') ->getQuery() ->getSingleScalarResult(); return intval($countSubscribers); } public function getSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId, int $limit): array { $queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId); $subscribersIds = $queryBuilder ->select('IDENTITY(sts.subscriber) AS subscriber_id') ->orderBy('sts.subscriber', 'asc') ->setMaxResults($limit) ->getQuery() ->getSingleColumnResult(); return $subscribersIds; } public function deleteByTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskSubscriberEntity::class, 'sts') ->where('sts.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); } /** * @param int[] $subscriberIds */ public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void { if ($subscriberIds) { $this->entityManager->createQueryBuilder() ->update(ScheduledTaskSubscriberEntity::class, 'sts') ->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED) ->where('sts.subscriber IN (:subscriberIds)') ->andWhere('sts.task = :task') ->setParameter('subscriberIds', $subscriberIds, Connection::PARAM_INT_ARRAY) ->setParameter('task', $task) ->getQuery() ->execute(); } $this->checkCompleted($task); } public function createSubscribersForBounceWorker(ScheduledTaskEntity $scheduledTaskEntity): void { $scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName(); $subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName(); $stmt = $this->entityManager->getConnection()->prepare(" INSERT IGNORE INTO " . $scheduledTaskSubscribersTable . " (task_id, subscriber_id, processed) SELECT :taskId AS task_id, s.`id` AS subscriber_id, :unprocessed AS processed FROM " . $subscribersTable . " s WHERE s.`deleted_at` IS NULL AND s.`status` IN (:subscribed, :unconfirmed) "); $stmt->bindValue('taskId', $scheduledTaskEntity->getId()); $stmt->bindValue('unprocessed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); $stmt->bindValue('subscribed', SubscriberEntity::STATUS_SUBSCRIBED); $stmt->bindValue('unconfirmed', SubscriberEntity::STATUS_UNCONFIRMED); $stmt->executeQuery(); } public function deleteByScheduledTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskSubscriberEntity::class, 'sts') ->where('sts.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); } private function checkCompleted(ScheduledTaskEntity $task): void { $count = $this->countBy(['task' => $task, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]); if ($count === 0) { $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); $this->entityManager->flush(); } } private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder { return $this->entityManager ->createQueryBuilder() ->from(ScheduledTaskSubscriberEntity::class, 'sts') ->andWhere('sts.task = :taskId') ->andWhere('sts.subscriber > :lastProcessedSubscriberId') ->andWhere('sts.processed = :status') ->setParameter('taskId', $taskId) ->setParameter('lastProcessedSubscriberId', $lastProcessedSubscriberId) ->setParameter('status', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); } } PK1[ 333ScheduledTasksRepository.phpnu[ */ class ScheduledTasksRepository extends Repository { /** @var WPFunctions */ private $wp; public function __construct( EntityManager $entityManager, WPFunctions $wp ) { $this->wp = $wp; parent::__construct($entityManager); } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByNewsletterAndStatus(NewsletterEntity $newsletter, string $status): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('st.status = :status') ->andWhere('sq.newsletter = :newsletter') ->setParameter('status', $status) ->setParameter('newsletter', $newsletter) ->getQuery() ->getResult(); } /** * @param NewsletterEntity $newsletter */ public function findOneByNewsletter(NewsletterEntity $newsletter): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('sq.newsletter = :newsletter') ->orderBy('sq.updatedAt', 'desc') ->setMaxResults(1) ->setParameter('newsletter', $newsletter) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } public function findOneBySendingQueue(SendingQueueEntity $sendingQueue): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('sq.id = :sendingQueue') ->setMaxResults(1) ->setParameter('sendingQueue', $sendingQueue) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByScheduledAndRunningForNewsletter(NewsletterEntity $newsletter): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('st.status = :status OR st.status IS NULL') ->andWhere('sq.newsletter = :newsletter') ->setParameter('status', NewsletterEntity::STATUS_SCHEDULED) ->setParameter('newsletter', $newsletter) ->getQuery() ->getResult(); } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByNewsletterAndSubscriberId(NewsletterEntity $newsletter, int $subscriberId): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->andWhere('sq.newsletter = :newsletter') ->andWhere('sts.subscriber = :subscriber') ->setParameter('newsletter', $newsletter) ->setParameter('subscriber', $subscriberId) ->getQuery() ->getResult(); } public function findOneScheduledByNewsletterAndSubscriber(NewsletterEntity $newsletter, SubscriberEntity $subscriber): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->andWhere('st.status = :status') ->andWhere('sq.newsletter = :newsletter') ->andWhere('sts.subscriber = :subscriber') ->setMaxResults(1) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->setParameter('newsletter', $newsletter) ->setParameter('subscriber', $subscriber) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } public function findScheduledOrRunningTask(?string $type): ?ScheduledTaskEntity { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('((st.status = :scheduledStatus) OR (st.status is NULL))') ->andWhere('st.deletedAt IS NULL') ->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED) ->setMaxResults(1) ->orderBy('st.scheduledAt', 'DESC'); if (!empty($type)) { $queryBuilder ->andWhere('st.type = :type') ->setParameter('type', $type); } return $queryBuilder->getQuery()->getOneOrNullResult(); } public function findScheduledTask(?string $type): ?ScheduledTaskEntity { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.status = :scheduledStatus') ->andWhere('st.deletedAt IS NULL') ->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED) ->setMaxResults(1) ->orderBy('st.scheduledAt', 'DESC'); if (!empty($type)) { $queryBuilder ->andWhere('st.type = :type') ->setParameter('type', $type); } return $queryBuilder->getQuery()->getOneOrNullResult(); } public function findPreviousTask(ScheduledTaskEntity $task): ?ScheduledTaskEntity { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.type = :type') ->setParameter('type', $task->getType()) ->andWhere('st.createdAt < :created') ->setParameter('created', $task->getCreatedAt()) ->orderBy('st.scheduledAt', 'DESC') ->setMaxResults(1) ->getQuery() ->getOneOrNullResult(); } public function findDueByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit); } public function findRunningByType($type, $limit = null) { return $this->findByTypeAndStatus($type, null, $limit); } public function findCompletedByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_COMPLETED, $limit); } public function findFutureScheduledByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit, true); } public function getCountsPerStatus(string $type = 'sending') { $stats = [ ScheduledTaskEntity::STATUS_COMPLETED => 0, ScheduledTaskEntity::STATUS_PAUSED => 0, ScheduledTaskEntity::STATUS_SCHEDULED => 0, ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING => 0, ]; $counts = $this->doctrineRepository->createQueryBuilder('st') ->select('COUNT(st.id) as value') ->addSelect('st.status') ->where('st.deletedAt IS NULL') ->andWhere('st.type = :type') ->setParameter('type', $type) ->addGroupBy('st.status') ->getQuery() ->getResult(); foreach ($counts as $count) { if ($count['status'] === null) { $stats[ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING] = (int)$count['value']; continue; } $stats[$count['status']] = (int)$count['value']; } return $stats; } /** * @param string|null $type * @param array $statuses * @param int $limit * @return array */ public function getLatestTasks( $type = null, $statuses = [ ScheduledTaskEntity::STATUS_COMPLETED, ScheduledTaskEntity::STATUS_SCHEDULED, ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING, ], $limit = Scheduler::TASK_BATCH_SIZE ) { $result = []; foreach ($statuses as $status) { $tasksQuery = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.deletedAt IS NULL') ->where('st.status = :status'); if ($status === ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING) { $tasksQuery = $tasksQuery->orWhere('st.status IS NULL'); } if ($type) { $tasksQuery = $tasksQuery->andWhere('st.type = :type') ->setParameter('type', $type); } $tasks = $tasksQuery ->setParameter('status', $status) ->setMaxResults($limit) ->orderBy('st.id', 'desc') ->getQuery() ->getResult(); $result = array_merge($result, $tasks); } return $result; } /** * @return ScheduledTaskEntity[] */ public function findRunningSendingTasks(?int $limit = null): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join('st.sendingQueue', 'sq') ->where('st.type = :type') ->andWhere('st.status IS NULL') ->andWhere('st.deletedAt IS NULL') ->orderBy('st.priority', 'ASC') ->addOrderBy('st.updatedAt', 'ASC') ->setMaxResults($limit) ->setParameter('type', SendingQueue::TASK_TYPE) ->getQuery() ->getResult(); } /** * @param string $type * @param SubscriberEntity $subscriber * @return ScheduledTaskEntity[] * @throws \MailPoetVendor\Doctrine\ORM\NonUniqueResultException */ public function findByTypeAndSubscriber(string $type, SubscriberEntity $subscriber): array { $query = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->where('st.type = :type') ->andWhere('sts.subscriber = :subscriber') ->andWhere('st.deletedAt IS NULL') ->andWhere('st.status = :status') ->setParameter('type', $type) ->setParameter('subscriber', $subscriber->getId()) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->getQuery(); $tasks = $query->getResult(); return $tasks; } public function touchAllByIds(array $ids): void { $now = CarbonImmutable::createFromTimestamp((int)$this->wp->currentTime('timestamp')); $this->entityManager->createQueryBuilder() ->update(ScheduledTaskEntity::class, 'st') ->set('st.updatedAt', ':updatedAt') ->setParameter('updatedAt', $now) ->where('st.id IN (:ids)') ->setParameter('ids', $ids, Connection::PARAM_INT_ARRAY) ->getQuery() ->execute(); } /** * @return ScheduledTaskEntity[] */ public function findScheduledSendingTasks(?int $limit = null): array { $now = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join('st.sendingQueue', 'sq') ->where('st.deletedAt IS NULL') ->andWhere('st.status = :status') ->andWhere('st.scheduledAt <= :now') ->andWhere('st.type = :type') ->orderBy('st.updatedAt', 'ASC') ->setMaxResults($limit) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->setParameter('now', $now) ->setParameter('type', SendingQueue::TASK_TYPE) ->getQuery() ->getResult(); } public function invalidateTask(ScheduledTaskEntity $task): void { $task->setStatus( ScheduledTaskEntity::STATUS_INVALID); $this->persist($task); $this->flush(); } protected function findByTypeAndStatus($type, $status, $limit = null, $future = false) { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.type = :type') ->setParameter('type', $type) ->andWhere('st.deletedAt IS NULL'); if (is_null($status)) { $queryBuilder->andWhere('st.status IS NULL'); } else { $queryBuilder ->andWhere('st.status = :status') ->setParameter('status', $status); } if ($future) { $queryBuilder->andWhere('st.scheduledAt > :now'); } else { $queryBuilder->andWhere('st.scheduledAt <= :now'); } $now = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $queryBuilder->setParameter('now', $now); if ($limit) { $queryBuilder->setMaxResults($limit); } return $queryBuilder->getQuery()->getResult(); } protected function getEntityClassName() { return ScheduledTaskEntity::class; } } PK1[R3qq-ScheduledTaskSubscribersListingRepository.phpnu[queryBuilder; $this->applyFromClause($queryBuilder); $this->applyParameters($queryBuilder, $definition->getParameters()); // total count $countQueryBuilder = clone $queryBuilder; $countQueryBuilder->select('COUNT(sts.subscriber) AS subscriberCount'); $totalCount = intval($countQueryBuilder->getQuery()->getSingleScalarResult()); // Sent count $sentCountQuery = clone $queryBuilder; $sentCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $sentCountQuery->andWhere('sts.processed = :processedStatus'); $sentCountQuery->andWhere('sts.failed = :failedStatus'); $sentCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED); $sentCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK); $sentCount = intval($sentCountQuery->getQuery()->getSingleScalarResult()); // Failed count $failedCountQuery = clone $queryBuilder; $failedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $failedCountQuery->andWhere('sts.failed = :failedStatus'); $failedCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED); $failedCount = intval($failedCountQuery->getQuery()->getSingleScalarResult()); // Unprocessed count $unprocessedCountQuery = clone $queryBuilder; $unprocessedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $unprocessedCountQuery->andWhere('sts.processed = :processedStatus'); $unprocessedCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); $unprocessedCount = intval($unprocessedCountQuery->getQuery()->getSingleScalarResult()); return [ [ 'name' => 'all', 'label' => __('All', 'mailpoet'), 'count' => $totalCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT, 'label' => __('Sent', 'mailpoet'), 'count' => $sentCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED, 'label' => __('Failed', 'mailpoet'), 'count' => $failedCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED, 'label' => __('Unprocessed', 'mailpoet'), 'count' => $unprocessedCount, ], ]; } protected function applySelectClause(QueryBuilder $queryBuilder) { $queryBuilder->select("PARTIAL sts.{task,subscriber,processed,failed,error,createdAt,updatedAt}, PARTIAL s.{id, email, firstName, lastName}"); } protected function applyFromClause(QueryBuilder $queryBuilder) { $queryBuilder->from(ScheduledTaskSubscriberEntity::class, 'sts') ->leftJoin('sts.subscriber', 's'); } protected function applyGroup(QueryBuilder $queryBuilder, string $group) { if ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT) { $queryBuilder->andWhere('sts.processed = :processedStatus'); $queryBuilder->andWhere('sts.failed = :failedStatus'); $queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED); $queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK); } elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED) { $queryBuilder->andWhere('sts.failed = :failedStatus'); $queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED); } elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED) { $queryBuilder->andWhere('sts.processed = :processedStatus'); $queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); } } protected function applySorting(QueryBuilder $queryBuilder, string $sortBy, string $sortOrder) { // ScheduledTaskSubscriber doesn't have id column so the default fallback value 'id' // generated in MailPoet\Listing\Handler needs to be changed to something else if ($sortBy === 'id') { $sortBy = 'sts.subscriber'; } elseif ($sortBy === 'subscriberId') { // Ordering by subscriberId is mapped to email for consistency with Subscriber listing $sortBy = 's.email'; } else { $sortBy = "sts.{$sortBy}"; } $queryBuilder->addOrderBy($sortBy, $sortOrder); } protected function applySearch(QueryBuilder $queryBuilder, string $search) { $search = Helpers::escapeSearch($search); $queryBuilder ->andWhere('s.email LIKE :search or s.firstName LIKE :search or s.lastName LIKE :search') ->setParameter('search', "%$search%"); } protected function applyFilters(QueryBuilder $queryBuilder, array $filters) { // the parent class requires this method, but scheduled task subscribers listing doesn't currently support this feature. } protected function applyParameters(QueryBuilder $queryBuilder, array $parameters) { if (isset($parameters['task_ids']) && !empty($parameters['task_ids'])) { $queryBuilder->andWhere('sts.task IN (:taskIds)') ->setParameter('taskIds', $parameters['task_ids']); } } public function getCount(ListingDefinition $definition): int { $queryBuilder = clone $this->queryBuilder; $this->applyFromClause($queryBuilder); $this->applyConstraints($queryBuilder, $definition); $queryBuilder->select("COUNT(DISTINCT sts.subscriber)"); return intval($queryBuilder->getQuery()->getSingleScalarResult()); } } PK1[ index.phpnu[ */ class SendingQueuesRepository extends Repository { /** @var ScheduledTaskSubscribersRepository */ private $scheduledTaskSubscribersRepository; /** @var WPFunctions */ private $wp; public function __construct( EntityManager $entityManager, WPFunctions $wp, ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository ) { parent::__construct($entityManager); $this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository; $this->wp = $wp; } protected function getEntityClassName() { return SendingQueueEntity::class; } public function findOneByNewsletterAndTaskStatus(NewsletterEntity $newsletter, string $status): ?SendingQueueEntity { return $this->entityManager->createQueryBuilder() ->select('s') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->where('t.status = :status') ->andWhere('s.newsletter = :newsletter') ->setParameter('status', $status) ->setParameter('newsletter', $newsletter) ->getQuery() ->getOneOrNullResult(); } public function countAllByNewsletterAndTaskStatus(NewsletterEntity $newsletter, string $status): int { return intval($this->entityManager->createQueryBuilder() ->select('count(s.task)') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->where('t.status = :status') ->andWhere('s.newsletter = :newsletter') ->setParameter('status', $status) ->setParameter('newsletter', $newsletter) ->getQuery() ->getSingleScalarResult()); } public function getTaskIdsByNewsletterId(int $newsletterId): array { $results = $this->entityManager->createQueryBuilder() ->select('IDENTITY(s.task) as task_id') ->from(SendingQueueEntity::class, 's') ->andWhere('s.newsletter = :newsletter') ->setParameter('newsletter', $newsletterId) ->getQuery() ->getArrayResult(); return array_map('intval', array_column($results, 'task_id')); } public function isSubscriberProcessed(SendingQueueEntity $queue, SubscriberEntity $subscriber): bool { $task = $queue->getTask(); if (is_null($task)) return false; return $this->scheduledTaskSubscribersRepository->isSubscriberProcessed($task, $subscriber); } /** * @return SendingQueueEntity[] */ public function findAllForSubscriberSentBetween( SubscriberEntity $subscriber, ?\DateTimeInterface $dateTo, ?\DateTimeInterface $dateFrom ): array { $qb = $this->entityManager->createQueryBuilder() ->select('s, n') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->join('t.subscribers', 'tsub') ->join('s.newsletter', 'n') ->where('t.status = :status') ->setParameter('status', ScheduledTaskEntity::STATUS_COMPLETED) ->andWhere('t.type = :sendingType') ->setParameter('sendingType', 'sending') ->andWhere('tsub.subscriber = :subscriber') ->setParameter('subscriber', $subscriber); if ($dateTo) { $qb->andWhere('t.updatedAt < :dateTo') ->setParameter('dateTo', $dateTo); } if ($dateFrom) { $qb->andWhere('t.updatedAt > :dateFrom') ->setParameter('dateFrom', $dateFrom); } return $qb->getQuery()->getResult(); } public function pause(SendingQueueEntity $queue): void { if ($queue->getCountProcessed() !== $queue->getCountTotal()) { $task = $queue->getTask(); if ($task instanceof ScheduledTaskEntity) { $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->flush(); } } } public function resume(SendingQueueEntity $queue): void { $task = $queue->getTask(); if (!$task instanceof ScheduledTaskEntity) return; if ($queue->getCountProcessed() === $queue->getCountTotal()) { $processedAt = Carbon::createFromTimestamp($this->wp->currentTime('mysql')); $task->setProcessedAt($processedAt); $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); // Update also status of newsletter if necessary $newsletter = $queue->getNewsletter(); if ($newsletter instanceof NewsletterEntity && $newsletter->canBeSetSent()) { $newsletter->setStatus(NewsletterEntity::STATUS_SENT); } $this->flush(); } else { $newsletter = $queue->getNewsletter(); if (!$newsletter instanceof NewsletterEntity) return; if ($newsletter->getStatus() === NewsletterEntity::STATUS_CORRUPT) { // force a re-render $queue->setNewsletterRenderedBody(null); $this->persist($queue); } $newsletter->setStatus(NewsletterEntity::STATUS_SENDING); $task->setStatus(null); $this->flush(); } } public function deleteByTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(SendingQueueEntity::class, 'sq') ->where('sq.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); } public function saveCampaignId(SendingQueueEntity $queue, string $campaignId): void { $meta = $queue->getMeta(); if (!is_array($meta)) { $meta = []; } $meta['campaignId'] = $campaignId; $queue->setMeta($meta); $this->flush(); } } PK1[ONqq&ScheduledTaskSubscribersRepository.phpnu[PK1[ 333ScheduledTasksRepository.phpnu[PK1[R3qq-LScheduledTaskSubscribersListingRepository.phpnu[PK1[ rdindex.phpnu[PK1[M.edSendingQueuesRepository.phpnu[PK{