Additional Modules

Having the todos module in place, let’s create two additional modules.

Analytics Module

The analytics module has a similar structure to todos module. It consists of a TodosCounter which stores some basic stats the command handler, and the event handler:

from commands import CreateTodo
from events import TodoWasCompleted

from lato import ApplicationModule


class TodosCounter:
    def __init__(self):
        self.created_todos = 0
        self.completed_todos = 0


analytics = ApplicationModule("analytics")


@analytics.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, counter: TodosCounter):
    counter.created_todos += 1


@analytics.handler(TodoWasCompleted)
def on_todo_was_completed(event: TodoWasCompleted, counter: TodosCounter) -> None:
    counter.completed_todos += 1

It’s worth noting that CreateTodo is handled by two modules now ( todos and analytics). When the application receives a this command, both modules will be triggered. In a similar way, multiple modules can respond to one query. This pattern is called Message Composition and Decomposition.

Notifications Module

Notifications module is using a fake NotificationService to push the message when the TodoWasCompleted is published by the todos module. In addition, it demonstrates how notifications module is querying todos module to get the title of a todo that has been completed:

from events import TodoWasCompleted
from queries import GetTodoDetails
from todos import TodoReadModel

from lato import ApplicationModule, TransactionContext


class NotificationService:
    def push(self, message):
        print(message)


notifications = ApplicationModule("notifications")


@notifications.handler(TodoWasCompleted)
def on_todo_was_completed(
    event: TodoWasCompleted, service: NotificationService, ctx: TransactionContext
):
    details: TodoReadModel = ctx.execute(GetTodoDetails(todo_id=event.todo_id))
    print(details)
    message = f"A todo {details.title} was completed"
    service.push(message)

In particular, the query is dispatched using a TransactionContext, which is the context in which the TodoWasCompleted event was published earlier:

@todos.handler(CompleteTodo)
def handle_complete_todo(
    command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
    a_todo = repo.get_by_id(command.todo_id)
    a_todo.mark_as_completed(now)
    ctx.publish(TodoWasCompleted(todo_id=a_todo.id))

In the next section we will see how modules are wired together into the application, and how the dependencies are provided to messages handlers.