Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- CreateTable
CREATE TABLE "LiteLLM_DailyOrganizationSpend" (
"id" TEXT NOT NULL,
"organization_id" TEXT,
"date" TEXT NOT NULL,
"api_key" TEXT NOT NULL,
"model" TEXT,
"model_group" TEXT,
"custom_llm_provider" TEXT,
"mcp_namespaced_tool_name" TEXT,
"prompt_tokens" BIGINT NOT NULL DEFAULT 0,
"completion_tokens" BIGINT NOT NULL DEFAULT 0,
"cache_read_input_tokens" BIGINT NOT NULL DEFAULT 0,
"cache_creation_input_tokens" BIGINT NOT NULL DEFAULT 0,
"spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0,
"api_requests" BIGINT NOT NULL DEFAULT 0,
"successful_requests" BIGINT NOT NULL DEFAULT 0,
"failed_requests" BIGINT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,

CONSTRAINT "LiteLLM_DailyOrganizationSpend_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "LiteLLM_DailyOrganizationSpend_date_idx" ON "LiteLLM_DailyOrganizationSpend"("date");

-- CreateIndex
CREATE INDEX "LiteLLM_DailyOrganizationSpend_organization_id_idx" ON "LiteLLM_DailyOrganizationSpend"("organization_id");

-- CreateIndex
CREATE INDEX "LiteLLM_DailyOrganizationSpend_api_key_idx" ON "LiteLLM_DailyOrganizationSpend"("api_key");

-- CreateIndex
CREATE INDEX "LiteLLM_DailyOrganizationSpend_model_idx" ON "LiteLLM_DailyOrganizationSpend"("model");

-- CreateIndex
CREATE INDEX "LiteLLM_DailyOrganizationSpend_mcp_namespaced_tool_name_idx" ON "LiteLLM_DailyOrganizationSpend"("mcp_namespaced_tool_name");

-- CreateIndex
CREATE UNIQUE INDEX "LiteLLM_DailyOrganizationSpend_organization_id_date_api_key_key" ON "LiteLLM_DailyOrganizationSpend"("organization_id", "date", "api_key", "model", "custom_llm_provider", "mcp_namespaced_tool_name");

29 changes: 29 additions & 0 deletions litellm-proxy-extras/litellm_proxy_extras/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,35 @@ model LiteLLM_DailyUserSpend {
@@index([mcp_namespaced_tool_name])
}

// Track daily organization spend metrics per model and key
model LiteLLM_DailyOrganizationSpend {
id String @id @default(uuid())
organization_id String?
date String
api_key String
model String?
model_group String?
custom_llm_provider String?
mcp_namespaced_tool_name String?
prompt_tokens BigInt @default(0)
completion_tokens BigInt @default(0)
cache_read_input_tokens BigInt @default(0)
cache_creation_input_tokens BigInt @default(0)
spend Float @default(0.0)
api_requests BigInt @default(0)
successful_requests BigInt @default(0)
failed_requests BigInt @default(0)
created_at DateTime @default(now())
updated_at DateTime @updatedAt

@@unique([organization_id, date, api_key, model, custom_llm_provider, mcp_namespaced_tool_name])
@@index([date])
@@index([organization_id])
@@index([api_key])
@@index([model])
@@index([mcp_namespaced_tool_name])
}

// Track daily team spend metrics per model and key
model LiteLLM_DailyTeamSpend {
id String @id @default(uuid())
Expand Down
1 change: 1 addition & 0 deletions litellm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer"
REDIS_DAILY_ORG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_org_spend_update_buffer"
REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_tag_spend_update_buffer"
MAX_REDIS_BUFFER_DEQUEUE_COUNT = int(os.getenv("MAX_REDIS_BUFFER_DEQUEUE_COUNT", 100))
MAX_SIZE_IN_MEMORY_QUEUE = int(os.getenv("MAX_SIZE_IN_MEMORY_QUEUE", 10000))
Expand Down
5 changes: 5 additions & 0 deletions litellm/proxy/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2608,6 +2608,7 @@ class SpendLogsPayload(TypedDict):
cache_key: str
request_tags: str # json str
team_id: Optional[str]
organization_id: Optional[str]
end_user: Optional[str]
requester_ip_address: Optional[str]
custom_llm_provider: Optional[str]
Expand Down Expand Up @@ -3544,6 +3545,10 @@ class DailyTeamSpendTransaction(BaseDailySpendTransaction):
team_id: str


class DailyOrganizationSpendTransaction(BaseDailySpendTransaction):
organization_id: str


class DailyUserSpendTransaction(BaseDailySpendTransaction):
user_id: str

Expand Down
122 changes: 118 additions & 4 deletions litellm/proxy/db/db_spend_update_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DB_CONNECTION_ERROR_TYPES,
BaseDailySpendTransaction,
DailyTagSpendTransaction,
DailyOrganizationSpendTransaction,
DailyTeamSpendTransaction,
DailyUserSpendTransaction,
DBSpendUpdateTransactions,
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(
self.spend_update_queue = SpendUpdateQueue()
self.daily_spend_update_queue = DailySpendUpdateQueue()
self.daily_team_spend_update_queue = DailySpendUpdateQueue()
self.daily_org_spend_update_queue = DailySpendUpdateQueue()
self.daily_tag_spend_update_queue = DailySpendUpdateQueue()

async def update_database(
Expand Down Expand Up @@ -180,7 +182,13 @@ async def update_database(
prisma_client=prisma_client,
)
)

asyncio.create_task(
self.add_spend_log_transaction_to_daily_org_transaction(
payload=payload,
org_id=org_id,
prisma_client=prisma_client,
)
)
asyncio.create_task(
self.add_spend_log_transaction_to_daily_tag_transaction(
payload=payload,
Expand Down Expand Up @@ -460,6 +468,7 @@ async def _commit_spend_updates_to_db_with_redis(
spend_update_queue=self.spend_update_queue,
daily_spend_update_queue=self.daily_spend_update_queue,
daily_team_spend_update_queue=self.daily_team_spend_update_queue,
daily_org_spend_update_queue=self.daily_org_spend_update_queue,
daily_tag_spend_update_queue=self.daily_tag_spend_update_queue,
)

Expand Down Expand Up @@ -502,6 +511,17 @@ async def _commit_spend_updates_to_db_with_redis(
daily_spend_transactions=daily_team_spend_update_transactions,
)

daily_org_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_org_spend_update_transactions_from_redis_buffer()
)
if daily_org_spend_update_transactions is not None:
await DBSpendUpdateWriter.update_daily_org_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_org_spend_update_transactions,
)

daily_tag_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_tag_spend_update_transactions_from_redis_buffer()
)
Expand Down Expand Up @@ -573,6 +593,20 @@ async def _commit_spend_updates_to_db_without_redis_buffer(
daily_spend_transactions=daily_team_spend_update_transactions,
)

################## Daily Organization Spend Update Transactions ##################
# Aggregate all in memory daily org spend transactions and commit to db
daily_org_spend_update_transactions = cast(
Dict[str, DailyOrganizationSpendTransaction],
await self.daily_org_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
)

await DBSpendUpdateWriter.update_daily_org_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_org_spend_update_transactions,
)

################## Daily Tag Spend Update Transactions ##################
# Aggregate all in memory daily tag spend transactions and commit to db
daily_tag_spend_update_transactions = cast(
Expand Down Expand Up @@ -938,6 +972,20 @@ async def _update_daily_spend(
) -> None:
...

@overload
@staticmethod
async def _update_daily_spend(
n_retry_times: int,
prisma_client: PrismaClient,
proxy_logging_obj: ProxyLogging,
daily_spend_transactions: Dict[str, DailyOrganizationSpendTransaction],
entity_type: Literal["org"],
entity_id_field: str,
table_name: str,
unique_constraint_name: str,
) -> None:
...

@overload
@staticmethod
async def _update_daily_spend(
Expand All @@ -962,14 +1010,15 @@ async def _update_daily_spend(
Dict[str, DailyUserSpendTransaction],
Dict[str, DailyTeamSpendTransaction],
Dict[str, DailyTagSpendTransaction],
Dict[str, DailyOrganizationSpendTransaction],
],
entity_type: Literal["user", "team", "tag"],
entity_type: Literal["user", "team", "org", "tag"],
entity_id_field: str,
table_name: str,
unique_constraint_name: str,
) -> None:
"""
Generic function to update daily spend for any entity type (user, team, tag)
Generic function to update daily spend for any entity type (user, team, org, tag)
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception

Expand Down Expand Up @@ -1191,6 +1240,27 @@ async def update_daily_team_spend(
unique_constraint_name="team_id_date_api_key_model_custom_llm_provider_mcp_namespaced_tool_name",
)

@staticmethod
async def update_daily_org_spend(
n_retry_times: int,
prisma_client: PrismaClient,
proxy_logging_obj: ProxyLogging,
daily_spend_transactions: Dict[str, DailyOrganizationSpendTransaction],
):
"""
Batch job to update LiteLLM_DailyOrganizationSpend table using in-memory daily_spend_transactions
"""
await DBSpendUpdateWriter._update_daily_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_spend_transactions,
entity_type="org",
entity_id_field="organization_id",
table_name="litellm_dailyorganizationspend",
unique_constraint_name="organization_id_date_api_key_model_custom_llm_provider_mcp_namespaced_tool_name",
)

@staticmethod
async def update_daily_tag_spend(
n_retry_times: int,
Expand All @@ -1216,13 +1286,15 @@ async def _common_add_spend_log_transaction_to_daily_transaction(
self,
payload: Union[dict, SpendLogsPayload],
prisma_client: PrismaClient,
type: Literal["user", "team", "request_tags"] = "user",
type: Literal["user", "team", "org", "request_tags"] = "user",
) -> Optional[BaseDailySpendTransaction]:
common_expected_keys = ["startTime", "api_key"]
if type == "user":
expected_keys = ["user", *common_expected_keys]
elif type == "team":
expected_keys = ["team_id", *common_expected_keys]
elif type == "org":
expected_keys = ["organization_id", *common_expected_keys]
elif type == "request_tags":
expected_keys = ["request_tags", *common_expected_keys]
else:
Expand Down Expand Up @@ -1354,6 +1426,48 @@ async def add_spend_log_transaction_to_daily_team_transaction(
update={daily_transaction_key: daily_transaction}
)

async def add_spend_log_transaction_to_daily_org_transaction(
self,
payload: SpendLogsPayload,
prisma_client: Optional[PrismaClient] = None,
org_id: Optional[str] = None,
) -> None:
if prisma_client is None:
verbose_proxy_logger.debug(
"prisma_client is None. Skipping writing spend logs to db."
)
return

if org_id is None:
verbose_proxy_logger.debug(
"organization_id is None for request. Skipping incrementing organization spend."
)
return

payload_with_org = cast(
SpendLogsPayload,
{
**payload,
"organization_id": org_id,
},
)

base_daily_transaction = (
await self._common_add_spend_log_transaction_to_daily_transaction(
payload_with_org, prisma_client, "org"
)
)
if base_daily_transaction is None:
return

daily_transaction_key = f"{org_id}_{base_daily_transaction['date']}_{payload_with_org['api_key']}_{payload_with_org['model']}_{payload_with_org['custom_llm_provider']}"
daily_transaction = DailyOrganizationSpendTransaction(
organization_id=org_id, **base_daily_transaction
)
await self.daily_org_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction}
)

async def add_spend_log_transaction_to_daily_tag_transaction(
self,
payload: SpendLogsPayload,
Expand Down
36 changes: 36 additions & 0 deletions litellm/proxy/db/db_transaction_queue/redis_update_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY,
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
REDIS_DAILY_ORG_SPEND_UPDATE_BUFFER_KEY,
REDIS_UPDATE_BUFFER_KEY,
)
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.proxy._types import (
DailyTagSpendTransaction,
DailyTeamSpendTransaction,
DailyUserSpendTransaction,
DailyOrganizationSpendTransaction,
DBSpendUpdateTransactions,
)
from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj
Expand Down Expand Up @@ -104,6 +106,7 @@ async def store_in_memory_spend_updates_in_redis(
spend_update_queue: SpendUpdateQueue,
daily_spend_update_queue: DailySpendUpdateQueue,
daily_team_spend_update_queue: DailySpendUpdateQueue,
daily_org_spend_update_queue: DailySpendUpdateQueue,
daily_tag_spend_update_queue: DailySpendUpdateQueue,
):
"""
Expand Down Expand Up @@ -166,6 +169,9 @@ async def store_in_memory_spend_updates_in_redis(
daily_team_spend_update_transactions = (
await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
daily_org_spend_update_transactions = (
await daily_org_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
daily_tag_spend_update_transactions = (
await daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
Expand Down Expand Up @@ -195,6 +201,12 @@ async def store_in_memory_spend_updates_in_redis(
service_type=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE,
)

await self._store_transactions_in_redis(
transactions=daily_org_spend_update_transactions,
redis_key=REDIS_DAILY_ORG_SPEND_UPDATE_BUFFER_KEY,
service_type=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE,
)

await self._store_transactions_in_redis(
transactions=daily_tag_spend_update_transactions,
redis_key=REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY,
Expand Down Expand Up @@ -329,6 +341,30 @@ async def get_all_daily_team_spend_update_transactions_from_redis_buffer(
),
)

async def get_all_daily_org_spend_update_transactions_from_redis_buffer(
self,
) -> Optional[Dict[str, DailyOrganizationSpendTransaction]]:
"""
Gets all the daily organization spend update transactions from Redis
"""
if self.redis_cache is None:
return None
list_of_transactions = await self.redis_cache.async_lpop(
key=REDIS_DAILY_ORG_SPEND_UPDATE_BUFFER_KEY,
count=MAX_REDIS_BUFFER_DEQUEUE_COUNT,
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
return cast(
Dict[str, DailyOrganizationSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions
),
)

async def get_all_daily_tag_spend_update_transactions_from_redis_buffer(
self,
) -> Optional[Dict[str, DailyTagSpendTransaction]]:
Expand Down
Loading
Loading