summaryrefslogtreecommitdiffstatshomepage
path: root/core/lib/Drupal/Core/Cron.php
blob: 1773d810a25b9999137ef9f95e203f5ac9f183d1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
<?php

namespace Drupal\Core;

use Drupal\Component\Datetime\TimeInterface;
use Drupal\Component\Utility\Environment;
use Drupal\Component\Utility\Timer;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Lock\LockBackendInterface;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Session\AccountSwitcherInterface;
use Drupal\Core\Session\AnonymousUserSession;
use Drupal\Core\State\StateInterface;
use Drupal\Core\Utility\Error;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

/**
 * The Drupal core Cron service.
 */
class Cron implements CronInterface {

  /**
   * The queue config.
   *
   * @var array
   */
  protected array $queueConfig;

  /**
   * Constructs a cron object.
   *
   * @param \Drupal\Core\Extension\ModuleHandlerInterface $moduleHandler
   *   The module handler.
   * @param \Drupal\Core\Lock\LockBackendInterface $lock
   *   The lock service.
   * @param \Drupal\Core\Queue\QueueFactory $queueFactory
   *   The queue service.
   * @param \Drupal\Core\State\StateInterface $state
   *   The state service.
   * @param \Drupal\Core\Session\AccountSwitcherInterface $accountSwitcher
   *   The account switching service.
   * @param \Psr\Log\LoggerInterface $logger
   *   A logger instance.
   * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queueManager
   *   The queue plugin manager.
   * @param \Drupal\Component\Datetime\TimeInterface $time
   *   The time service.
   * @param array $queue_config
   *   Queue configuration from the service container.
   */
  public function __construct(
    protected ModuleHandlerInterface $moduleHandler,
    protected LockBackendInterface $lock,
    protected QueueFactory $queueFactory,
    protected StateInterface $state,
    protected AccountSwitcherInterface $accountSwitcher,
    protected LoggerInterface $logger,
    protected QueueWorkerManagerInterface $queueManager,
    protected TimeInterface $time,
    array $queue_config,
  ) {
    $this->queueConfig = $queue_config + [
      'suspendMaximumWait' => 30.0,
    ];
  }

  /**
   * {@inheritdoc}
   */
  public function run() {
    // Allow execution to continue even if the request gets cancelled.
    @ignore_user_abort(TRUE);

    // Force the current user to anonymous to ensure consistent permissions on
    // cron runs.
    $this->accountSwitcher->switchTo(new AnonymousUserSession());

    // Try to allocate enough time to run all the hook_cron implementations.
    Environment::setTimeLimit(240);

    $return = FALSE;

    // Try to acquire cron lock.
    if (!$this->lock->acquire('cron', 900.0)) {
      // Cron is still running normally.
      $this->logger->warning('Attempting to re-run cron while it is already running.');
    }
    else {
      $this->invokeCronHandlers();

      // Process cron queues.
      $this->processQueues();

      $this->setCronLastTime();

      // Release cron lock.
      $this->lock->release('cron');

      // Add watchdog message.
      $this->logger->info('Cron run completed.');

      // Return TRUE so other functions can check if it did run successfully
      $return = TRUE;
    }

    // Restore the user.
    $this->accountSwitcher->switchBack();

    return $return;
  }

  /**
   * Records and logs the request time for this cron invocation.
   */
  protected function setCronLastTime() {
    // Record cron time.
    $request_time = $this->time->getRequestTime();
    $this->state->set('system.cron_last', $request_time);
  }

  /**
   * Processes cron queues.
   */
  protected function processQueues() {
    $max_wait = (float) $this->queueConfig['suspendMaximumWait'];

    // Build a stack of queues to work on.
    /** @var array<array{process_from: int<0, max>, queue: \Drupal\Core\Queue\QueueInterface, worker: \Drupal\Core\Queue\QueueWorkerInterface}> $queues */
    $queues = [];
    foreach ($this->queueManager->getDefinitions() as $queue_name => $queue_info) {
      if (!isset($queue_info['cron'])) {
        continue;
      }
      $queue = $this->queueFactory->get($queue_name);
      // Make sure every queue exists. There is no harm in trying to recreate
      // an existing queue.
      $queue->createQueue();
      $worker = $this->queueManager->createInstance($queue_name);
      $queues[] = [
        // Set process_from to zero so each queue is always processed
        // immediately for the first time. This process_from timestamp will
        // change if a queue throws a delayable SuspendQueueException.
        'process_from' => 0,
        'queue' => $queue,
        'worker' => $worker,
      ];
    }

    // Work through stack of queues, re-adding to the stack when a delay is
    // necessary.
    while ($item = array_shift($queues)) {
      [
        'queue' => $queue,
        'worker' => $worker,
        'process_from' => $process_from,
      ] = $item;

      // Each queue will be processed immediately when it is reached for the
      // first time, as zero > currentTime will never be true.
      if ($process_from > $this->time->getCurrentMicroTime()) {
        $this->usleep((int) round($process_from - $this->time->getCurrentMicroTime(), 3) * 1000000);
      }

      try {
        $this->processQueue($queue, $worker);
      }
      catch (SuspendQueueException $e) {
        // Return to this queue after processing other queues if the delay is
        // within the threshold.
        if ($e->isDelayable() && ($e->getDelay() < $max_wait)) {
          $item['process_from'] = $this->time->getCurrentMicroTime() + $e->getDelay();
          // Place this queue back in the stack for processing later.
          array_push($queues, $item);
        }
      }

      // Reorder the queue by next 'process_from' timestamp.
      usort($queues, function (array $queueA, array $queueB) {
        return $queueA['process_from'] <=> $queueB['process_from'];
      });
    }
  }

  /**
   * Processes a cron queue.
   *
   * @param \Drupal\Core\Queue\QueueInterface $queue
   *   The queue.
   * @param \Drupal\Core\Queue\QueueWorkerInterface $worker
   *   The queue worker.
   *
   * @throws \Drupal\Core\Queue\SuspendQueueException
   *   If the queue was suspended.
   */
  protected function processQueue(QueueInterface $queue, QueueWorkerInterface $worker) {
    $lease_time = $worker->getPluginDefinition()['cron']['time'];
    $end = $this->time->getCurrentTime() + $lease_time;
    while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
      try {
        $worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (DelayedRequeueException $e) {
        // The worker requested the task not be immediately re-queued.
        // - If the queue doesn't support ::delayItem(), we should leave the
        // item's current expiry time alone.
        // - If the queue does support ::delayItem(), we should allow the
        // queue to update the item's expiry using the requested delay.
        if ($queue instanceof DelayableQueueInterface) {
          // This queue can handle a custom delay; use the duration provided
          // by the exception.
          $queue->delayItem($item, $e->getDelay());
        }
      }
      catch (RequeueException) {
        // The worker requested the task be immediately requeued.
        $queue->releaseItem($item);
      }
      catch (SuspendQueueException $e) {
        // If the worker indicates the whole queue should be skipped, release
        // the item and go to the next queue.
        $queue->releaseItem($item);

        $this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
          '@queue' => $worker->getPluginId(),
        ]);

        // Skip to the next queue.
        throw $e;
      }
      catch (\Exception $e) {
        // In case of any other kind of exception, log it and leave the item
        // in the queue to be processed again later.
        Error::logException($this->logger, $e);
      }
    }
  }

  /**
   * Invokes any cron handlers implementing hook_cron.
   */
  protected function invokeCronHandlers() {
    $module_previous = '';

    // If detailed logging isn't enabled, don't log individual execution times.
    $time_logging_enabled = \Drupal::config('system.cron')->get('logging');
    $logger = $time_logging_enabled ? $this->logger : new NullLogger();

    // Iterate through the modules calling their cron handlers (if any):
    $this->moduleHandler->invokeAllWith('cron', function (callable $hook, string $module) use (&$module_previous, $logger) {
      if (!$module_previous) {
        $logger->info('Starting execution of @module_cron().', [
          '@module' => $module,
        ]);
      }
      else {
        $logger->info('Starting execution of @module_cron(), execution of @module_previous_cron() took @time.', [
          '@module' => $module,
          '@module_previous' => $module_previous,
          '@time' => Timer::read('cron_' . $module_previous) . 'ms',
        ]);
      }
      Timer::start('cron_' . $module);

      // Do not let an exception thrown by one module disturb another.
      try {
        $hook();
      }
      catch (\Exception $e) {
        Error::logException($this->logger, $e);
      }

      Timer::stop('cron_' . $module);
      $module_previous = $module;
    });
    if ($module_previous) {
      $logger->info('Execution of @module_previous_cron() took @time.', [
        '@module_previous' => $module_previous,
        '@time' => Timer::read('cron_' . $module_previous) . 'ms',
      ]);
    }
  }

  /**
   * Delay execution in microseconds.
   *
   * @param int $microseconds
   *   Halt time in microseconds.
   */
  protected function usleep(int $microseconds): void {
    usleep($microseconds);
  }

}