First Module

Conceptually, the application module is a collection of message handlers. The handler is a function that accepts a message, and other dependencies required for message processing. It then processes the message in a specific way, typically by reading some objects (in our case todo items) from a database, or changing their state (i.e. updating the state). As a side effect of message processing, new events may be published, which successively are handled by event handlers.

Declaring a module

Lets start by declaring the core module of our application - todos module. The ApplicationModule employs a decorator pattern to link handlers for different messages within one module.

from lato import ApplicationModule, TransactionContext

todos = ApplicationModule("todos")

Now, given the TodoModel and TodoRepository, we are finally ready to implement all the handlers. Let’s go through the handlers one by one:

Command handlers

Below is the handler for creating a new todo. The first argument is the command, and second one is the todo repository. The repository serves as a dependency and it will be provided by the application at the time of handler execution.

@todos.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, repo: TodoRepository):
    new_todo = TodoModel(
        id=command.todo_id,
        title=command.title,
        description=command.description,
        due_at=command.due_at,
    )
    repo.add(new_todo)

One of the core features of Lato is the ability to automatically inject handler dependencies - see Dependency Injection for more details.

Publishing events

A command handler for completing a todo follows a similar pattern. However, it introduces to additional dependencies:

@todos.handler(CompleteTodo)
def handle_complete_todo(
    command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
    a_todo = repo.get_by_id(command.todo_id)
    a_todo.mark_as_completed(now)
    ctx.publish(TodoWasCompleted(todo_id=a_todo.id))

The dependencies are:

  • ctx - TransactionContext object. Transaction Context is a core concept in lato, responsible for managing a context in which messages are being processed. Here we are using it to publish an event.

  • now - the current datetime.

The ctx.publish method is used to publish the event to any subscriber that is interested in its processing. In this case we are emitting TodoWasCompleted, which could handled both Analytics module to update productivity stats, and by Notifications (i.e. to send an email).

Note

Events play a crucial role in achieving loose coupling by allowing communication between different parts of the system without direct dependencies. When one handler publishes a event, it doesn’t need to know which other handlers are interested in that event. Thus, the sender and receiver are decoupled.

Query handlers

The role query handlers is to retrieve data, without introducing any changes to the state of the application. By separating the query handling logic from command handling logic, we can optimize the data retrieval process, and achieve a higher degree of decoupling between the components responsible for reading and writing.

Here is the handler for retrieving all todos:

@todos.handler(GetAllTodos)
def get_all_todos(
    query: GetAllTodos, repo: TodoRepository, now: datetime
) -> list[TodoReadModel]:
    result = repo.get_all()
    return [todo_model_to_read_model(todo, now) for todo in result]

and the handler from retrieving completed and not completed todos, depending on the query payload:

@todos.handler(GetSomeTodos)
def get_some_todos(query: GetSomeTodos, repo: TodoRepository, now: datetime):
    if query.completed is None:
        result = repo.get_all()
    else:
        result = (
            repo.get_all_completed()
            if query.completed
            else repo.get_all_not_completed()
        )

    return [todo_model_to_read_model(todo, now) for todo in result]

It’s worth noting that both query handlers return read models. Here is the implementation of mapping from a domain model to a read model:

def todo_model_to_read_model(todo: TodoModel, now: datetime) -> TodoReadModel:
    return TodoReadModel(
        id=todo.id,
        title=todo.title,
        description=todo.description,
        is_due=todo.is_due(now),
        is_completed=todo.is_completed,
    )

Let’s wrap it up:

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from uuid import UUID

from commands import CompleteTodo, CreateTodo
from events import TodoWasCompleted
from queries import GetAllTodos, GetSomeTodos, GetTodoDetails

from lato import ApplicationModule, TransactionContext

todos = ApplicationModule("todos")


@dataclass
class TodoModel:
    """Model representing a todo"""

    id: UUID
    title: str
    description: str = ""
    due_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

    @property
    def is_completed(self) -> bool:
        return self.completed_at is not None

    def is_due(self, now: datetime) -> bool:
        if self.due_at is None or self.is_completed is False:
            return False
        return self.due_at < now

    def mark_as_completed(self, when: datetime) -> None:
        self.completed_at = when


@dataclass
class TodoReadModel:
    """Read model exposed to the external world"""

    id: UUID
    title: str
    description: str
    is_due: bool
    is_completed: bool


def todo_model_to_read_model(todo: TodoModel, now: datetime) -> TodoReadModel:
    return TodoReadModel(
        id=todo.id,
        title=todo.title,
        description=todo.description,
        is_due=todo.is_due(now),
        is_completed=todo.is_completed,
    )


class TodoRepository:
    """A repository of todos"""

    def __init__(self):
        self.items: list[TodoModel] = []

    def add(self, item: TodoModel) -> None:
        self.items.append(item)

    def get_by_id(self, todo_id: UUID) -> TodoModel:
        for item in self.items:
            if item.id == todo_id:
                return item
        raise ValueError(f"Todo with id {todo_id} does not exist")

    def get_all(self) -> list[TodoModel]:
        return self.items

    def get_all_completed(self):
        return [todo for todo in self.items if todo.is_completed]

    def get_all_not_completed(self):
        return [todo for todo in self.items if not todo.is_completed]


@todos.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, repo: TodoRepository):
    new_todo = TodoModel(
        id=command.todo_id,
        title=command.title,
        description=command.description,
        due_at=command.due_at,
    )
    repo.add(new_todo)


@todos.handler(CompleteTodo)
def handle_complete_todo(
    command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
    a_todo = repo.get_by_id(command.todo_id)
    a_todo.mark_as_completed(now)
    ctx.publish(TodoWasCompleted(todo_id=a_todo.id))


@todos.handler(GetTodoDetails)
def get_todo_details(query: GetTodoDetails, repo: TodoRepository, now: datetime):
    a_todo = repo.get_by_id(query.todo_id)
    return todo_model_to_read_model(a_todo, now)


@todos.handler(GetAllTodos)
def get_all_todos(
    query: GetAllTodos, repo: TodoRepository, now: datetime
) -> list[TodoReadModel]:
    result = repo.get_all()
    return [todo_model_to_read_model(todo, now) for todo in result]


@todos.handler(GetSomeTodos)
def get_some_todos(query: GetSomeTodos, repo: TodoRepository, now: datetime):
    if query.completed is None:
        result = repo.get_all()
    else:
        result = (
            repo.get_all_completed()
            if query.completed
            else repo.get_all_not_completed()
        )

    return [todo_model_to_read_model(todo, now) for todo in result]