Transaction¶
The transaction module addresses the lack of explicit way to manage our database transactions. You can read more about why in the design document ⧉.
Usage¶
There are 2 main ways to use the transaction layer: with a context manager and with a decorator.
Context Manager¶
The context manager is the simplest way to get a session and act within a transaction.
from shared.transaction import transaction
with transaction() as session:
session.add(User(name="Marcel"))
It guarantees that everything inside the context will commit together or fail together. Note that explicit call to commit inside the context manager is forbidden as it is handled by the context manager.
This code will commit automatically as soon as you exit the context manager, if a failure occurs, the session will be rollback appropriately.
However, if this code is itself contained in another transaction, the commit will occur only when exiting the outermost context:
def create_user():
with transaction() as session:
session.add(User(name="Marcel"))
def complex_operation():
with transaction() as session:
... # some code
create_user() # does not commit
... # some more code
# commits!
Decorator¶
The module provides a @transactional decorator that behaves the same way as the context manager. Decorated functions are expected to take a session as first positional argument that will be automatically injected. This is a constraint due to typing and ParamSpec specifically.
from shared.transaction import transactional
@transactional
def create_user(session: Session):
session.add(User(name="Marcel"))
Propagation¶
Propagation is a key concept in this transaction module and is what allows you to decide the behavior you want. There are currently 3 supported propagations
REQUIRED¶
This is the default behavior
I am OK with being associated with a parent transaction
As shown above, the commit and rollback decision is left to the outermost transaction, this ensures strong consistency within the context, whatever the nesting level you're at.
REQUIRES_NEW¶
I need my own transaction
Sometimes, you may want to manage your own transaction and control the commit time. This is a way to ensure strong isolation of your code from the rest of the codebase, for auditing purposes for example.
Any child code that has propagation REQUIRED or NESTED will be included in your transaction.
@transactional(propagation=Propagation.REQUIRES_NEW)
def create_user_and_beneficiary(session: Session):
session.add(Beneficiary(name="Benedicte"))
create_user() # will take part of the new transaction
However, this does not impact the rest of the transaction behavior:
def complex_operation():
with transaction() as session:
...
create_user_and_beneficiary() # commits in its own transaction
...
# commits
Note: this will be particularly useful if at some point we want to impose that each component manage it's own transaction and we don't share the same transaction across component as it's just a change of propagation.
NESTED¶
I want to be nested within the parent transaction
Use cases for nested transaction typically reside in being able to partially rollback:
@transactional(propagation=Propagation.NESTED)
def apply_discount(session: Session, order_id: UUID, discount_code: str):
...
@transactional
def process_order(session: Session, order_id: UUID, item_id: UUID, quantity: int, discount_code: str):
reserve_inventory(item_id, quantity, session)
try:
apply_discount(order_id, discount_code)
except Exception:
print("Discount application failed, continuing without discount.")
place_order(order_id, session)
READ_ONLY¶
I only want to query
This will provide you with a ReadOnlySession that prevents typical mutating operations and even prevents commits at the connection level. It's never been safer to not make any changes!
@transactional(propagation=Propagation.READ_ONLY)
def read_discount(session: Session, order_id: UUID, discount_code: str) -> int:
order = session.scalars(select(Order).where(Order.id == order_id)).one()
return order.discounts.get(discount_code, 0)
Events¶
We often want to execute some code when the transaction successfully completed, typically to send an email. To that effect, you can use register_event which will dispatch the event in case of successful commit of the current transaction context:
from shared.messaging import Message
from shared.transaction import register_event
@dataclass(frozen=True)
class OnboardingCompleted(Message):
email: str
@transactional
def onboard_member(session: Session, email: str):
create_user(email=email)
register_event(OnboardingCompleted(email=email))
On the subscriber side:
from shared.messaging.broker import get_message_broker
def send_onboarding_success_email(event: OnboardingCompleted):
...
message_broker = get_message_broker()
message_broker.subscribe_async(
OnboardingCompleted,
send_onboarding_success_email,
queue_name=LOW_PRIORITY,
)
More documentation on how to consume events is available in shared.messaging
Backward Compatibility¶
This module is built with backward compatibility in mind, working in harmony with current_session:
When outside a transaction context, current_session points to the global session.
This same global session is used for all top level REQUIRED propagations.
To understand transaction's behavior, it's important to know that SQLAlchemy opens a new transaction each time
the current_session is interacted with (e.g. execute, add) and closes it when calling commit or rollback.
See test_global_session.py for different tests on this behavior.
Clean slate¶
Currently in our codebase, this is almost never the case
When no transaction is open, transaction with propagation REQUIRED behaves as the outermost transaction context and will commit at the end of the context.
@transactional
def create_user(session: Session):
session.add(User(name="Marcel"))
def complex_operation():
# assuming transaction is not open at this point (either not interacted with at all or freshly committed / rolled back)
create_user() # commits
A transaction has already begun¶
When a transaction has already begun, transaction assumes that the finality will be determined by the caller and thus does not commit by default.
def complex_legacy_operation():
# simulate opening a transaction by making a simple query
marcels = current_session.scalars(select(User).where(User.name == "Marcel")).all()
create_user() # does not commit
current_session.commit() # have to do it here and handle rollback as well!
Taking control¶
Alternatively, and during the time of the migration, you can force the management of the existing transaction by setting force_manage to True. Be aware that doing so, you risk committing things that were added in current_session before.
This is best used in the top level transaction context.
@transactional(force_manage=True)
def create_user(session: Session):
session.add(User(name="Marcel"))
def complex_legacy_operation():
marcels = current_session.scalars(select(User).where(User.name == "Marcel")).all()
create_user() # commits
Common pitfalls¶
Mixing sessions¶
Don't reuse ORM objects from different transaction context: ultimately what you want is for your transaction context to be pretty autonomous!
def do_not_do_this():
with transaction() as session:
user = current_session.scalars(select(User).where(User.name == "Marcel")).first()
with transaction() as child_session:
child_session.delete(user) # won't work
Instead, you can pass primitive attributes like the user_id around or just use a single transaction context.
Before / After¶
Contracting in Canada¶
Notable changes:
- No need for
no_commit_in_session - No need to pass
commitargument around - No more import of
current_session
Before¶
@ca_tracer_wrap()
def initialize_subscription(
company_id: UUID,
commit: bool = False,
) -> BaseSubscription:
from components.contracting.public.subscription.api import (
initialize_subscription as _initialize_subscription,
)
with no_commit_in_session(commit_at_end=commit, rollback_at_end=False):
subscription_payload = CaHealthSubscriptionPayload()
current_session.add(subscription_payload)
current_session.flush()
return _initialize_subscription(
SubscriptionScope.ca_health,
owner_type=OwnerType.COMPANY,
owner_ref=str(company_id),
payload_ref=subscription_payload.id,
commit=False,
)
After¶
@transactional
@ca_tracer_wrap()
def initialize_subscription(session: Session, company_id: UUID) -> BaseSubscription:
from components.contracting.public.subscription.api import (
initialize_subscription as _initialize_subscription,
)
subscription_payload = CaHealthSubscriptionPayload()
session.add(subscription_payload)
session.flush()
return _initialize_subscription(
SubscriptionScope.ca_health,
owner_type=OwnerType.COMPANY,
owner_ref=str(company_id),
payload_ref=subscription_payload.id,
commit=False,
)
Full example¶
Use case: At onbaording time, I want to complete personal information and enroll in selected Alan Services.
###############
# shared/alan #
###############
class AlanService(Enum):
HEALTH = "health"
PREVOYANCE = "prevoyance"
PLAY = "play"
WELLNESS = "wellness"
##########################
# components/affiliation #
##########################
class ServiceEnrollment(AlanBaseModel):
__tablename__ = "service_enrollments"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
profile_id = Column(UUID(as_uuid=True), nullable=False, index=True)
service = Column(String(50), nullable=False)
start_date = Column(DateTime, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
@dataclass(frozen=True)
class MemberEnrolledEvent(Message):
profile_id: uuid.UUID
service: AlanService
start_date: date
# when called directly without a parent transaction exist, this will commit at the end
# otherwise, it will commit together with the parent transaction
@transactional
def enroll_in_service(
session: Session,
profile_id: uuid.UUID,
service: AlanService,
start_date: datetime,
) -> uuid.UUID:
enrollment = ServiceEnrollment(
profile_id=profile_id,
service=service.value,
start_date=start_date,
)
session.add(enrollment)
session.flush() # to get the id
enrollment_id = enrollment.id
# will be sent out after commit
register_event(MemberEnrolledEvent(
profile_id=profile_id,
service=service,
start_date=start_date
))
return enrollment_id
#########################
# components/onboarding #
#########################
@dataclass(frozen=True)
class OnboardingCompletedEvent(Message):
profile_id: uuid.UUID
services: list[AlanService]
# what we want now is orchestrate several components/methods in a complete flow
# while still making sure all goes through or none at all
def complete_onboarding(
profile_id: str,
address: Address,
selected_services: list[AlanService],
) -> uuid.UUID:
start_date = datetime.today()
with transaction():
# note: ProfileService automatically integrates with current transaction via TransactionUnitOfWork
profile_service = ProfileService.create()
profile_service.change_address(
profile_id=profile_id,
address=address
)
for service in selected_services:
# like for ProfileService, this will not commit because we have a parent transaction
enroll_in_service(
profile_id=profile_id,
service=service,
start_date=start_date
)
# will be sent out after commit
register_event(OnboardingCompletedEvent(
profile_id=profile_id,
services=selected_services
))
return profile_id
# we have a route that uses it
@onboarding_blueprint.route("/onboarding/complete", methods=["POST"])
def complete_onboarding_api():
data = request.get_json()
profile_id = data["profile_id"]
address = Address(
street=data["address"]["street"],
city=data["address"]["city"],
postal_code=data["address"]["postal_code"],
country=data["address"]["country"]
)
selected_services = [AlanService(service) for service in data["services"]]
# before entering this top level business logic method, it's important that the global session is NOT active
# alternatively, we can make sure `complete_onboarding` always commits by:
# - using `force_manage=True` to take control of the global session
# - using `propagation=Propagation.REQUIRES_NEW` to just start with a fresh one every time (recommended)
# in both cases it means that `complete_onboarding` will always commit and thus cannot be composed further
# like in a `complete_onboarding_and_do_something` method
result_profile_id = complete_onboarding(
profile_id=profile_id,
address=address,
selected_services=selected_services
)
return jsonify({
"profile_id": result_profile_id,
"status": "completed",
"services": [s.value for s in selected_services]
})
########################
# components/analytics #
########################
# let's make sure customerio is aware of this fresh onboarding
def send_to_customerio(event: OnboardingCompletedEvent):
import requests
profile_service = ProfileService.create()
profile = profile_service.get_or_raise_profile(event.profile_id)
requests.post(
"https://track.customer.io/api/v1/customers",
json={
"id": str(event.profile_id),
"email": profile.email,
"onboarded_services": [s.value for s in event.services]
},
headers={"Authorization": "Bearer CUSTOMERIO_TOKEN"}
)
# will be processed as an RQ job
message_broker.subscribe_async(
OnboardingCompletedEvent,
send_to_customerio,
queue_name=LOW_PRIORITY,
)
####################
# apps/fr_api/... #
####################