Task Manager
The task manager is a bundle that builds in top of Symfony Messenger to add additional convenience functionality for running, logging and queuing tasks.
As this bundle adds quite a lot of functionality on top of Symfonys messenger system, we use the names "task" instead of "message", and "task handler" instead of "message handler" to make clear, that these are the extended versions.
Installation
Install the bundle:
composer require 21torr/task-manager
If you are using Symfony Flex you are all set. If not, you need to create an initial configuration file like described below.
Configuration
While this bundle auto-detects all queue names, you should define them manually in your config, as otherwise the priority between these queues might be wrong:
task_manager:
queues:
# queues sorted by priority. Highest priority at the top
- app_very_urgent
- app_urgent
- app
Defining Tasks
You have to define your tasks by creating a new task class extending the Task
base class:
use Torr\TaskManager\Task\Task;
class readonly PimImportTask extends Task
{
public function __construct (
private string $locale,
)
{
parent::__construct();
}
/**
*
*/
public function getMetaData () : TaskMetaData
{
return new TaskMetaData(
label: "PIM Import: {$this->locale}",
group: "PIM",
uniqueTaskId: "pim.import.{$this->locale}",
);
}
}
Your task object needs to be serializable. Avoid unnecessary state, e.g. you should always generate the metadata object on-the-fly.
Metadata
The task has to define some metadata, to have a convenient integration into automated tools.
Label
An identifying label for the task and it's configuration. You can use some or all parameters here, to improve readability.
Group
An optional group label. Used for grouping related tasks when building UI (like the CLI mentioned below).
Unique Task Id
The unique task id is used for uniquely identifying the type of certain tasks, to avoid registering them multiple times.
Unique Tasks
By default, tasks can be registered even if the same task is already added in the queue. For a lot of tasks this is quite wasteful, as running the same task consecutively multiple times will have the exact same result.
Example: you register a task to regenerate all image caches. If you change multiple images, you might want to register this task multiple times, once after every image change. By using correct task priority and putting the cache regeneration after all "adding images" tasks, you only need to run the task once.
Every task can optionally define a unique task id that is used to identify the same tasks:
return new TaskMetaData(
// ...
uniqueTaskId: "pim.import.{$this->locale}",
);
When queueing the task, the task manager will first loop through all registered tasks in all queues and look for a task with the same unique task id. If one is found, the new task is not queued.
You can and should use config state of the task object to have proper identifying task ids.
In the PIM import example, you should use the locale to generate the task id — otherwise the EN and FR pim imports might use the same task id, and you would erroneously assume you don't have to run the task.
Registering Tasks
The bundle provides a way to register your tasks in a global registry, that then can be used to build a UI around automatically queueing tasks.
You can integrate your existing task classes by registering them in the event:
use Symfony\Component\EventDispatcher\Attribute\AsEventListener;
use Torr\TaskManager\Event\RegisterTasksEvent;
class RegisterTasksListener
{
#[AsEventListener]
public function onRegisterTasks (RegisterTasksEvent $event) : void
{
$event->register(new PimImportTask("de"));
}
}
By default, your tasks are not auto-detected, so you can have "internal" tasks that are not manually selectable.
You can view all registered tasks in the debug command:
bin/console task-manager:debug
Queueing Tasks
You can then queue these tasks using the TaskManager
:
use Torr\TaskManager\Manager\TaskManager;
public function example (TaskManager $taskManager)
{
$englishPimImport = new PimImportTask("en");
$taskManager->enqueue($englishPimImport);
}
If you need to pass stamps, you can do that in the second parameter:
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
$taskManager->enqueue(new PimImportTask("en"), [
new DispatchAfterCurrentBusStamp(),
]);
When queueing tasks inside a task handler, you should always add the DispatchAfterCurrentBusStamp
.
You can also queue tasks via the CLI:
You can automatically queue registered tasks via the CLI:
bin/console task-manager:queue
You can either interactively select the tasks or directly pass the task IDs to the command:
bin/console task-manager:queue task-id-1 task-id-2
Task Handlers
As this bundle builds on top of Symfony messages, you define your task handler as message handler:
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Torr\TaskManager\Director\TaskDirector;
final readonly class PimImportTaskHandler
{
public function __construct (
private TaskDirector $taskDirector,
) {}
#[AsMessageHandler]
public function onPimImport (PimImportTask $task) : void
{
// start the run
$run = $this->taskDirector->startRun($task);
// you can get the IO from the run director
$io = $run->getIo();
// ... do your thing ...
// at the end you mark your task as finished and indicate, whether the task was handled successfully
$run->finish(success: true);
}
}
When your task failed, you have two options: you can either throw an exception, to mark this run as failed (and let the retry functionality kick in) or you can finish handling the task, but mark it as success: false
.
You use the TaskDirector
to start a run for the given task. This will return a RunDirector
, that helps you with working through your run:
- you can get IO to display information on the console
- the IO output will automatically be logged as well
- you can mark the run as success / failure
While not explicitly ->finish()
ed tasks will be handled properly, you should always finish
all your task runs.
Task Log
The TaskDirector
and RunDirector
automatically log the output of your tasks, whether they succeeded and some metadata. This includes the task object itself, so it must be serializable.
You can view the task log via the CLI:
bin/console task-manager:log
The bundle only contains the entities, no migrations. You might need to generate the migrations yourself, if you use the doctrine migrations bundle.
Running the Message Handler
Internally the task manager uses Symfony Messenger, so to run the tasks, you just consume the messages:
bin/console messenger:consume $(bin/console task-manager:messenger:queue-names)
This command automatically uses the queues in the correct (by priority in descending) order. Schedules automatically stay at the top of the priority list.
Debugging
You can debug your detected transports, the priority and the registered task via the CLI:
bin/console task-manager:debug