First Module¶
Conceptually, the application module is a collection of message handlers. The handler is a function that accepts a message, and other dependencies required for message processing. It then processes the message in a specific way, typically by reading some objects (in our case todo items) from a database, or changing their state (i.e. updating the state). As a side effect of message processing, new events may be published, which successively are handled by event handlers.
Declaring a module¶
Lets start by declaring the core module of our application - todos module. The ApplicationModule
employs a decorator pattern to
link handlers for different messages within one module.
from lato import ApplicationModule, TransactionContext
todos = ApplicationModule("todos")
Now, given the TodoModel
and TodoRepository
, we are finally ready to implement all the handlers.
Let’s go through the handlers one by one:
Command handlers¶
Below is the handler for creating a new todo. The first argument is the command, and second one is the todo repository. The repository serves as a dependency and it will be provided by the application at the time of handler execution.
@todos.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, repo: TodoRepository):
new_todo = TodoModel(
id=command.todo_id,
title=command.title,
description=command.description,
due_at=command.due_at,
)
repo.add(new_todo)
One of the core features of Lato is the ability to automatically inject handler dependencies - see Dependency Injection for more details.
Publishing events¶
A command handler for completing a todo follows a similar pattern. However, it introduces to additional dependencies:
@todos.handler(CompleteTodo)
def handle_complete_todo(
command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
a_todo = repo.get_by_id(command.todo_id)
a_todo.mark_as_completed(now)
ctx.publish(TodoWasCompleted(todo_id=a_todo.id))
The dependencies are:
ctx
- TransactionContext object. Transaction Context is a core concept in lato, responsible for managing a context in which messages are being processed. Here we are using it to publish an event.now
- the current datetime.
The ctx.publish
method is used to publish the event to any subscriber that is interested in its processing. In this case
we are emitting TodoWasCompleted
, which could handled both Analytics
module to update productivity stats,
and by Notifications
(i.e. to send an email).
Note
Events play a crucial role in achieving loose coupling by allowing communication between different parts of the system without direct dependencies. When one handler publishes a event, it doesn’t need to know which other handlers are interested in that event. Thus, the sender and receiver are decoupled.
Query handlers¶
The role query handlers is to retrieve data, without introducing any changes to the state of the application. By separating the query handling logic from command handling logic, we can optimize the data retrieval process, and achieve a higher degree of decoupling between the components responsible for reading and writing.
Here is the handler for retrieving all todos:
@todos.handler(GetAllTodos)
def get_all_todos(
query: GetAllTodos, repo: TodoRepository, now: datetime
) -> list[TodoReadModel]:
result = repo.get_all()
return [todo_model_to_read_model(todo, now) for todo in result]
and the handler from retrieving completed and not completed todos, depending on the query payload:
@todos.handler(GetSomeTodos)
def get_some_todos(query: GetSomeTodos, repo: TodoRepository, now: datetime):
if query.completed is None:
result = repo.get_all()
else:
result = (
repo.get_all_completed()
if query.completed
else repo.get_all_not_completed()
)
return [todo_model_to_read_model(todo, now) for todo in result]
It’s worth noting that both query handlers return read models. Here is the implementation of mapping from a domain model to a read model:
def todo_model_to_read_model(todo: TodoModel, now: datetime) -> TodoReadModel:
return TodoReadModel(
id=todo.id,
title=todo.title,
description=todo.description,
is_due=todo.is_due(now),
is_completed=todo.is_completed,
)
Let’s wrap it up:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from uuid import UUID
from commands import CompleteTodo, CreateTodo
from events import TodoWasCompleted
from queries import GetAllTodos, GetSomeTodos, GetTodoDetails
from lato import ApplicationModule, TransactionContext
todos = ApplicationModule("todos")
@dataclass
class TodoModel:
"""Model representing a todo"""
id: UUID
title: str
description: str = ""
due_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@property
def is_completed(self) -> bool:
return self.completed_at is not None
def is_due(self, now: datetime) -> bool:
if self.due_at is None or self.is_completed is False:
return False
return self.due_at < now
def mark_as_completed(self, when: datetime) -> None:
self.completed_at = when
@dataclass
class TodoReadModel:
"""Read model exposed to the external world"""
id: UUID
title: str
description: str
is_due: bool
is_completed: bool
def todo_model_to_read_model(todo: TodoModel, now: datetime) -> TodoReadModel:
return TodoReadModel(
id=todo.id,
title=todo.title,
description=todo.description,
is_due=todo.is_due(now),
is_completed=todo.is_completed,
)
class TodoRepository:
"""A repository of todos"""
def __init__(self):
self.items: list[TodoModel] = []
def add(self, item: TodoModel) -> None:
self.items.append(item)
def get_by_id(self, todo_id: UUID) -> TodoModel:
for item in self.items:
if item.id == todo_id:
return item
raise ValueError(f"Todo with id {todo_id} does not exist")
def get_all(self) -> list[TodoModel]:
return self.items
def get_all_completed(self):
return [todo for todo in self.items if todo.is_completed]
def get_all_not_completed(self):
return [todo for todo in self.items if not todo.is_completed]
@todos.handler(CreateTodo)
def handle_create_todo(command: CreateTodo, repo: TodoRepository):
new_todo = TodoModel(
id=command.todo_id,
title=command.title,
description=command.description,
due_at=command.due_at,
)
repo.add(new_todo)
@todos.handler(CompleteTodo)
def handle_complete_todo(
command: CompleteTodo, repo: TodoRepository, ctx: TransactionContext, now: datetime
):
a_todo = repo.get_by_id(command.todo_id)
a_todo.mark_as_completed(now)
ctx.publish(TodoWasCompleted(todo_id=a_todo.id))
@todos.handler(GetTodoDetails)
def get_todo_details(query: GetTodoDetails, repo: TodoRepository, now: datetime):
a_todo = repo.get_by_id(query.todo_id)
return todo_model_to_read_model(a_todo, now)
@todos.handler(GetAllTodos)
def get_all_todos(
query: GetAllTodos, repo: TodoRepository, now: datetime
) -> list[TodoReadModel]:
result = repo.get_all()
return [todo_model_to_read_model(todo, now) for todo in result]
@todos.handler(GetSomeTodos)
def get_some_todos(query: GetSomeTodos, repo: TodoRepository, now: datetime):
if query.completed is None:
result = repo.get_all()
else:
result = (
repo.get_all_completed()
if query.completed
else repo.get_all_not_completed()
)
return [todo_model_to_read_model(todo, now) for todo in result]