Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions server/polar/customer_meter/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections.abc import Sequence
from decimal import Decimal

import logfire
from sqlalchemy import Select, or_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.strategy_options import contains_eager
Expand Down Expand Up @@ -120,15 +121,18 @@ async def update_customer_meter(
customer.id, meter.id
)

event_repository = EventRepository.from_session(session)
events_statement = await self._get_current_window_events_statement(
session, customer, meter
)
last_event = await event_repository.get_one_or_none(
events_statement.order_by(None)
.order_by(Event.ingested_at.desc())
.limit(1)
)
with logfire.span("build_events_statement"):
event_repository = EventRepository.from_session(session)
events_statement = await self._get_current_window_events_statement(
session, customer, meter
)

with logfire.span("get_last_event"):
last_event = await event_repository.get_one_or_none(
events_statement.order_by(None)
.order_by(Event.ingested_at.desc())
.limit(1)
)

if last_event is None:
return customer_meter, False
Expand All @@ -141,18 +145,21 @@ async def update_customer_meter(
if customer_meter.last_balanced_event_id == last_event.id:
return customer_meter, False

usage_events_statement = events_statement.with_only_columns(Event.id).where(
Event.source == EventSource.user
)
usage_units = await meter_service.get_quantity(
session, meter, usage_events_statement
)
with logfire.span("get_usage_units"):
usage_events_statement = events_statement.with_only_columns(
Event.id
).where(Event.source == EventSource.user)
usage_units = await meter_service.get_quantity(
session, meter, usage_events_statement
)
customer_meter.consumed_units = Decimal(usage_units)

credit_events_statement = events_statement.where(
Event.is_meter_credit.is_(True)
)
credit_events = await event_repository.get_all(credit_events_statement)
with logfire.span("get_credit_events"):
credit_events_statement = events_statement.where(
Event.is_meter_credit.is_(True)
)
credit_events = await event_repository.get_all(credit_events_statement)

credited_units = non_negative_running_sum(
event.user_metadata["units"] for event in credit_events
)
Expand All @@ -162,7 +169,8 @@ async def update_customer_meter(
)
customer_meter.last_balanced_event = last_event

return await repository.update(customer_meter), True
with logfire.span("update_customer_meter"):
return await repository.update(customer_meter), True

async def get_rollover_units(
self, session: AsyncSession, customer: Customer, meter: Meter
Expand Down
Loading