Additional Modules¶
Having the todos module in place, let’s create two additional modules.
Analytics Module¶
The analytics module has a similar structure to todos module. It consists of a TodosCounter
which stores
some basic stats the command handler, and the event handler:
from commands import CreateTodo
from events import TodoWasCompleted
from lato import ApplicationModule
class TodosCounter:
def __init__(self):
self.created_todos = 0
self.completed_todos = 0
analytics = ApplicationModule("analytics")
@analytics.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, counter: TodosCounter):
counter.created_todos += 1
@analytics.handler(TodoWasCompleted)
def on_todo_was_completed(event: TodoWasCompleted, counter: TodosCounter) -> None:
counter.completed_todos += 1
It’s worth noting that CreateTodo
is handled by two modules now ( todos and analytics). When the application receives
a this command, both modules will be triggered. In a similar way, multiple modules can respond to one query.
This pattern is called Message Composition and Decomposition.
Notifications Module¶
Notifications module is using a fake NotificationService
to push the message when the TodoWasCompleted
is published by the todos module. In addition, it demonstrates how notifications module is querying todos module
to get the title of a todo that has been completed:
from events import TodoWasCompleted
from queries import GetTodoDetails
from todos import TodoReadModel
from lato import ApplicationModule, TransactionContext
class NotificationService:
def push(self, message):
print(message)
notifications = ApplicationModule("notifications")
@notifications.handler(TodoWasCompleted)
def on_todo_was_completed(
event: TodoWasCompleted, service: NotificationService, ctx: TransactionContext
):
details: TodoReadModel = ctx.execute(GetTodoDetails(todo_id=event.todo_id))
print(details)
message = f"A todo {details.title} was completed"
service.push(message)
In particular, the query is dispatched using a TransactionContext
,
which is the context in which the TodoWasCompleted
event was published earlier:
@todos.handler(CompleteTodo)
def handle_complete_todo(
command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
a_todo = repo.get_by_id(command.todo_id)
a_todo.mark_as_completed(now)
ctx.publish(TodoWasCompleted(todo_id=a_todo.id))
In the next section we will see how modules are wired together into the application, and how the dependencies are provided to messages handlers.