-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
1353 lines (1135 loc) · 47.1 KB
/
bot.py
File metadata and controls
1353 lines (1135 loc) · 47.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Core bot composition and FastAPI surface for the Vera judge harness.
This module exposes:
* The deterministic rule-engine composer (``compose``/``compose_async``)
* Pydantic schemas for the 4-context framework (category, merchant,
trigger, customer) plus the ``ComposedMessage`` output contract
* The FastAPI app implementing the 5 judge endpoints
(``/v1/context``, ``/v1/tick``, ``/v1/reply``, ``/v1/healthz``,
``/v1/metadata``) plus an optional ``/v1/teardown``
Production notes:
* Stateless across processes — context lives in an in-memory store, so
a single replica is required for the judge run window.
* LLM refinement is best-effort: any failure falls back silently to the
deterministic draft, keeping ``/v1/tick`` and ``/v1/reply`` within the
judge's 30 second SLA.
"""
import json
import logging
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Literal, TypeVar
import anyio
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from google.genai import types
from pydantic import BaseModel, Field, ValidationInfo, model_validator
load_dotenv()
# IMPORTANT: load_dotenv() must run before importing llm_pool, because
# llm_pool builds its singleton GeminiKeyPool from env at import time.
from llm_pool import COMPOSER_CHAIN, COMPOSER_MODELS, get_pool # noqa: E402
from semantic_matcher import semantic_matcher # noqa: E402
logger = logging.getLogger("vera.bot")
class AllowExtraModel(BaseModel):
"""Base model that permits extra fields from JSON payloads."""
class Config:
"""Pydantic configuration for permissive parsing."""
extra = "allow"
class PeerStats(AllowExtraModel):
"""Peer benchmark statistics for a category."""
scope: str | None = None
avg_rating: float | None = None
avg_review_count: int | None = None
avg_views_30d: int | None = None
avg_calls_30d: int | None = None
avg_directions_30d: int | None = None
avg_ctr: float | None = None
avg_photos: int | None = None
avg_post_freq_days: int | None = None
retention_6mo_pct: float | None = None
class CategoryContext(AllowExtraModel):
"""Category-level context used for voice and benchmarks."""
slug: str
display_name: str | None = None
peer_stats: PeerStats | None = None
offer_catalog: list[dict[str, Any]] | None = None
digest: list[dict[str, Any]] | None = None
class PerformanceSnapshot(AllowExtraModel):
"""Merchant performance metrics snapshot."""
window_days: int | None = None
views: int | None = None
calls: int | None = None
directions: int | None = None
ctr: float | None = None
leads: int | None = None
delta_7d: dict[str, float] | None = None
class MerchantIdentity(AllowExtraModel):
"""Human-facing identity details for the merchant."""
name: str | None = None
owner_first_name: str | None = None
languages: list[str] | None = None
class MerchantContext(AllowExtraModel):
"""Merchant-specific context including identity and performance."""
merchant_id: str
category_slug: str
identity: MerchantIdentity | None = None
performance: PerformanceSnapshot | None = None
class CustomerIdentity(AllowExtraModel):
"""Customer identity and language preferences."""
name: str | None = None
language_pref: str | None = None
class CustomerContext(AllowExtraModel):
"""Customer context for merchant-on-behalf messaging."""
customer_id: str
merchant_id: str
identity: CustomerIdentity | None = None
class TriggerContext(AllowExtraModel):
"""Trigger information driving the next message."""
id: str
scope: str
kind: str
source: str
merchant_id: str | None = None
customer_id: str | None = None
payload: dict[str, Any] = Field(default_factory=dict)
urgency: int | None = None
suppression_key: str | None = None
expires_at: str | None = None
class ComposedMessage(BaseModel):
"""Validated output payload for the judge harness."""
body: str
cta: Literal["yes_no", "open_ended", "none"]
send_as: Literal["vera", "merchant_on_behalf"]
suppression_key: str | None = None
rationale: str
@model_validator(mode="after")
def enforce_cta_position(self, info: ValidationInfo) -> "ComposedMessage":
"""Ensure YES/STOP CTA is placed at the end when required."""
if self.cta == "yes_no":
if not re.search(r"\b(YES|STOP)\b\s*\.?$", self.body, flags=re.IGNORECASE):
raise ValueError("YES/STOP CTA must be the final sentence.")
return self
@model_validator(mode="after")
def guard_promotional_tone(self, info: ValidationInfo) -> "ComposedMessage":
"""Reject promotional language for clinical categories."""
category = None
if info.context:
category = info.context.get("category")
if category and getattr(category, "slug", None) == "dentists":
if re.search(r"AMAZING|BEST DEAL|HURRY", self.body, flags=re.IGNORECASE):
raise ValueError("Promotional tone detected for dentists category.")
return self
@model_validator(mode="after")
def validate_language_mix(self, info: ValidationInfo) -> "ComposedMessage":
"""Validate required Hindi-English code mix when specified."""
if not info.context:
return self
language_pref = info.context.get("language_pref")
if not language_pref:
return self
if "hi" in language_pref:
has_hindi = re.search(
r"\b(namaste|aap|kripya|bataiye|haan|ji)\b",
self.body,
flags=re.IGNORECASE,
)
has_english = re.search(
r"\b(hi|hello|context|ready|continue)\b",
self.body,
flags=re.IGNORECASE,
)
if not (has_hindi and has_english):
raise ValueError("Expected Hindi-English code mix in body.")
return self
@model_validator(mode="after")
def validate_referenced_facts(self, info: ValidationInfo) -> "ComposedMessage":
"""Validate referenced prices against available offers."""
if not info.context:
return self
category = info.context.get("category")
merchant = info.context.get("merchant")
body = self.body
price_mentions = re.findall(r"₹\s?(\d+(?:\.\d+)?)", body)
if price_mentions:
offers = []
if category and getattr(category, "offer_catalog", None):
offers.extend([offer.get("title", "") for offer in category.offer_catalog])
merchant_offers = getattr(merchant, "offers", None)
if isinstance(merchant_offers, list):
offers.extend(
[offer.get("title", "") for offer in merchant_offers if isinstance(offer, dict)]
)
if not any(any(price in title for title in offers) for price in price_mentions):
raise ValueError("Price mentioned without matching offer catalog.")
return self
ModelType = TypeVar("ModelType", bound=BaseModel)
def _validate(model_cls: type[ModelType], raw: dict[str, Any]) -> ModelType:
if hasattr(model_cls, "model_validate"):
return model_cls.model_validate(raw)
return model_cls.parse_obj(raw)
def _context_version_check(merchant: MerchantContext, trigger: TriggerContext) -> None:
merchant_version = getattr(merchant, "context_version", None)
trigger_version = None
if isinstance(trigger.payload, dict):
trigger_version = trigger.payload.get("context_version")
if merchant_version is not None and trigger_version is not None:
if merchant_version != trigger_version:
raise ValueError("Stale context detected; please refresh /v1/context.")
def _determine_send_as(
trigger: TriggerContext,
customer: CustomerContext | None,
) -> Literal["vera", "merchant_on_behalf"]:
if trigger.scope == "customer" and customer is not None:
return "merchant_on_behalf"
return "vera"
def _language_pref(merchant: MerchantContext, customer: CustomerContext | None) -> str:
"""Infer a language preference for the outgoing message."""
if customer and customer.identity and customer.identity.language_pref:
return customer.identity.language_pref
if merchant.identity and merchant.identity.languages:
if "hi" in merchant.identity.languages:
return "hi-en mix"
return "en"
def _last_merchant_message(conversation_history: list[dict[str, Any]] | None) -> str | None:
"""Return the most recent merchant-authored message from history."""
if not conversation_history:
return None
for entry in reversed(conversation_history):
body = entry.get("body")
if entry.get("from") == "merchant" and isinstance(body, str):
return body.strip()
return None
def _auto_reply_detected(conversation_history: list[dict[str, Any]] | None) -> bool:
"""Detect auto-reply messages.
Detection strategy:
1. Repeated identical merchant messages → always auto-reply.
2. Delegate to semantic_matcher (Regex -> BGE-M3 -> Gemma-3 LLM).
"""
if not conversation_history:
return False
merchant_messages = [
entry.get("body", "")
for entry in conversation_history
if entry.get("from") == "merchant" and isinstance(entry.get("body"), str)
]
if not merchant_messages:
return False
# Strategy 1: identical repeated messages
if len(merchant_messages) >= 2 and merchant_messages[-1] == merchant_messages[-2]:
return True
last_message = merchant_messages[-1]
# Strategy 2: Strict Classification Pipeline
return semantic_matcher.is_auto_reply(last_message)
def _intent_transition_detected(message: str | None) -> bool:
"""Detect explicit intent to join or proceed.
Detection strategy:
Delegate to semantic_matcher (Regex -> BGE-M3 -> Gemma-3 LLM).
"""
if not message:
return False
return semantic_matcher.is_intent_transition(message)
async def _auto_reply_detected_async(
conversation_history: list[dict[str, Any]] | None,
) -> bool:
"""Async wrapper for auto-reply detection with LLM calls."""
return await anyio.to_thread.run_sync(
_auto_reply_detected,
conversation_history,
)
async def _intent_transition_detected_async(message: str | None) -> bool:
"""Async wrapper for intent transition detection with LLM calls."""
return await anyio.to_thread.run_sync(
_intent_transition_detected,
message,
)
async def _intent_type_async(message: str) -> str:
"""Async wrapper for intent classification with LLM calls."""
return await anyio.to_thread.run_sync(
semantic_matcher.get_intent_type,
message,
)
def _format_pct(value: float) -> str:
"""Format a ratio as a percent string with one decimal place."""
return f"{value * 100:.1f}%"
def _benchmark_facts(
merchant: MerchantContext,
category: CategoryContext,
) -> dict[str, str]:
"""Extract performance facts and peer comparisons for Stage 3."""
facts: dict[str, str] = {}
perf = merchant.performance
peer = category.peer_stats
if perf and perf.views is not None:
facts["views"] = f"{perf.views} views"
if perf and perf.ctr is not None:
facts["ctr"] = _format_pct(perf.ctr)
if perf and perf.calls is not None:
facts["calls"] = f"{perf.calls} calls"
if perf and peer and perf.ctr is not None and peer.avg_ctr is not None:
facts["ctr_gap"] = f"{_format_pct(perf.ctr)} vs peer {_format_pct(peer.avg_ctr)}"
return facts
def _research_digest_anchor(
trigger: TriggerContext,
category: CategoryContext,
) -> dict[str, str] | None:
"""Extract research digest metadata from trigger payload or category digest."""
if "research_digest" not in trigger.kind:
return None
payload_item = None
if isinstance(trigger.payload, dict):
payload_item = trigger.payload.get("top_item")
if payload_item:
return {
"title": payload_item.get("title", ""),
"source": payload_item.get("source", ""),
"trial_n": str(payload_item.get("trial_n", "")),
}
if category.digest:
first_item = category.digest[0]
return {
"title": first_item.get("title", ""),
"source": first_item.get("source", ""),
"trial_n": str(first_item.get("trial_n", "")),
}
return None
LEVER_MAP: dict[str, str] = {
# Loss aversion — best for performance dips or missed opportunities
"perf_dip": "loss_aversion",
"missed_search": "loss_aversion",
"dormant_with_vera": "loss_aversion",
"renewal_due": "loss_aversion",
"seasonal_acquisition_dip": "loss_aversion",
"winback": "loss_aversion",
"customer_lapsed_soft": "loss_aversion",
# Social proof — best for social-facing triggers or benchmarks
"milestone_reached": "social_proof",
"review_theme_emerged": "social_proof",
"competitor_opened": "social_proof",
"perf_spike": "social_proof",
"festival_upcoming": "social_proof",
# Effort externalization — best for "I've drafted X — just say go"
"research_digest": "effort_externalization",
"curious_ask_due": "effort_externalization",
"trial_followup": "effort_externalization",
"appointment_tomorrow": "effort_externalization",
"recall_due": "effort_externalization",
"chronic_refill_due": "effort_externalization",
"unverified_gbp": "effort_externalization",
}
"""O(1) trigger-kind → compulsion lever lookup table."""
def _select_compulsion_lever(trigger_kind: str) -> str:
"""Map a trigger kind to a compulsion lever via O(1) dictionary lookup.
Falls back to substring matching for compound trigger kinds (e.g.
'research_digest_release'), and returns 'neutral' when no lever matches.
"""
if trigger_kind in LEVER_MAP:
return LEVER_MAP[trigger_kind]
for key, lever in LEVER_MAP.items():
if key in trigger_kind:
return lever
return "neutral"
ACTION_TRIGGERS: frozenset[str] = frozenset({
"recall_due",
"appointment_tomorrow",
"trial_followup",
"chronic_refill_due",
"renewal_due",
"unverified_gbp",
"winback",
})
"""Trigger kinds that require a binary YES/STOP CTA."""
def _is_action_trigger(trigger_kind: str) -> bool:
"""Return True if the trigger kind requires a binary YES/STOP CTA.
Action-oriented triggers (recall, appointment, trial followup, etc.)
should produce a single binary commitment rather than an open-ended ask.
"""
if trigger_kind in ACTION_TRIGGERS:
return True
return any(key in trigger_kind for key in ACTION_TRIGGERS)
def _enforce_binary_cta(body: str, language_pref: str) -> str:
"""Append a YES/STOP binary CTA to the message body if not present.
Checks whether the body already ends with YES or STOP (case-insensitive).
If not, appends the appropriate CTA suffix based on language preference.
"""
if re.search(
r"\b(YES|STOP)\b\s*\.?$", body, flags=re.IGNORECASE
):
return body
if language_pref.startswith("hi"):
return f"{body} Reply YES to confirm, STOP to cancel. YES"
return f"{body} Reply YES to proceed or STOP to cancel. YES"
_suppression_store: dict[str, str] = {}
"""In-memory store mapping suppression_key → last sent body for dedup."""
def _check_suppression_dedup(
suppression_key: str | None,
body: str,
) -> bool:
"""Check if this body was already sent for the given suppression key.
Returns True if the body is a verbatim repeat (should be suppressed).
Returns False if it is new or the key is None.
Updates the store with the new body after the check.
"""
if not suppression_key:
return False
previous = _suppression_store.get(suppression_key)
_suppression_store[suppression_key] = body
return previous is not None and previous == body
def _apply_compulsion_lever(
message: str,
lever: str,
language_pref: str,
) -> str:
"""Append a lever cue to the message body."""
if lever == "loss_aversion":
if language_pref.startswith("hi"):
return f"{message} Missed demand avoid karne ke liye main ek quick fix bhej du?"
return f"{message} Want me to share a quick fix to avoid missing demand?"
if lever == "social_proof":
if language_pref.startswith("hi"):
return f"{message} Aapke area ke kuch peers ne isi week yeh try kiya hai."
return f"{message} A few peers in your area tried this this week."
if lever == "effort_externalization":
if language_pref.startswith("hi"):
return f"{message} Main draft ready karke bhej sakti hoon — bas YES bol dijiye."
return f"{message} I can draft it for you — just say YES."
return message
VOICE_PREFIX_MAP: dict[str, str] = {
"dentists": "Clinical note:",
"salons": "Quick tip:",
"restaurants": "Quick ops note:",
"gyms": "Coach's note:",
"pharmacies": "Compliance note:",
}
"""Category-slug → message prefix for voice-appropriate framing."""
def _apply_voice_modulation(category: CategoryContext, message: str) -> str:
"""Apply a category-specific tone prefix to the message body.
Uses the VOICE_PREFIX_MAP for O(1) lookup. When the category voice
data is available on the context, we additionally validate that
the prefix is appropriate for the declared tone; the mapping
itself is derived from the category tone definitions in the dataset.
"""
prefix = VOICE_PREFIX_MAP.get(category.slug)
if prefix:
return f"{prefix} {message}"
return message
def _build_rationale(
strategy: str,
lever: str,
category_slug: str,
trigger_kind: str,
) -> str:
"""Build a concise rationale explaining the composition strategy.
Args:
strategy: Which message-construction path was taken.
lever: The compulsion lever applied (or 'none').
category_slug: Category slug for voice reference.
trigger_kind: Trigger kind that drove the message.
Returns:
A 1-2 sentence rationale suitable for the judge harness.
"""
voice_label = VOICE_PREFIX_MAP.get(category_slug, "default")
lever_label = lever.replace("_", " ") if lever != "neutral" else "neutral framing"
parts: list[str] = []
if strategy == "auto_reply_exit":
parts.append("Detected canned auto-reply; routed to graceful exit.")
elif strategy == "intent_transition":
parts.append("Merchant signaled explicit intent; switched to action mode.")
elif strategy == "customer_facing":
parts.append("Customer-scoped trigger; sent as merchant_on_behalf.")
elif strategy == "digest_anchor":
parts.append(
f"Anchored on research digest from {trigger_kind} trigger "
f"using {lever_label} lever."
)
elif strategy == "benchmark_anchor":
parts.append(
f"Anchored on peer-median CTR benchmark to drive curiosity "
f"through {lever_label}."
)
else:
parts.append(
f"Fallback path with {lever_label} lever applied."
)
parts.append(f"Voice: {voice_label} ({category_slug}).")
return " ".join(parts)
# ---------------------------------------------------------------------------
# Modular prompt template system
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """\
You are Vera, a merchant-AI assistant on magicpin.
You help local merchants (dentists, salons, restaurants, gyms, pharmacies) \
grow their business via WhatsApp.
Rules:
1. Output valid JSON matching: {body, cta, send_as, rationale}.
2. cta must be one of: "yes_no", "open_ended", "none".
3. send_as must be one of: "vera", "merchant_on_behalf".
4. If cta is "yes_no", the body MUST end with YES or STOP.
5. Never fabricate facts, prices, or sources not in the input.
6. Keep the voice prefix at the start of the body if provided.
7. Body should be concise (2-4 sentences max).
"""
LEVER_TEMPLATES: dict[str, str] = {
"social_proof": """\
Write a message using SOCIAL PROOF framing.
Reference how peers or competitors in the merchant's area are performing.
Use phrases like "X peers in your locality", "others in your area".
Make the merchant curious about what others are doing.
""",
"loss_aversion": """\
Write a message using LOSS AVERSION framing.
Highlight what the merchant is missing or at risk of losing.
Use phrases like "missing X demand", "gap vs peer median", \
"before this window closes".
Create urgency without being promotional.
""",
"effort_externalization": """\
Write a message using EFFORT EXTERNALIZATION framing.
Offer to do the work for the merchant — "I've drafted X", \
"I can set this up", "just say YES".
Minimize perceived effort for the merchant.
""",
"neutral": """\
Write a helpful, peer-toned update message.
Be specific and fact-anchored but without a strong persuasion lever.
""",
}
"""Lever-specific prompt fragments injected into the LLM call."""
def _extract_jit_facts(
merchant_ctx: MerchantContext,
category_ctx: CategoryContext,
trigger_ctx: TriggerContext,
customer_ctx: CustomerContext | None,
benchmark: dict[str, str],
digest: dict[str, str] | None,
) -> dict[str, Any]:
"""Extract only the facts the LLM needs — no full context dumps.
Returns a small dict of concrete, verifiable data points sourced
from the four context objects and the pre-computed benchmark/digest.
This keeps the prompt token-efficient and reduces hallucination risk.
"""
facts: dict[str, Any] = {}
# Merchant identity
if merchant_ctx.identity:
facts["merchant_name"] = (
merchant_ctx.identity.owner_first_name
or merchant_ctx.identity.name
or "there"
)
else:
facts["merchant_name"] = "there"
# Performance numbers
if merchant_ctx.performance:
perf = merchant_ctx.performance
if perf.views is not None:
facts["views_30d"] = perf.views
if perf.ctr is not None:
facts["ctr"] = f"{perf.ctr * 100:.1f}%"
if perf.calls is not None:
facts["calls_30d"] = perf.calls
# Peer benchmarks
if benchmark:
facts["benchmark"] = benchmark
# Category voice
facts["voice_prefix"] = VOICE_PREFIX_MAP.get(
category_ctx.slug, ""
)
facts["category_slug"] = category_ctx.slug
conversation_history = getattr(merchant_ctx, "conversation_history", None)
if conversation_history:
facts["recent_conversation"] = conversation_history[-6:]
# Digest anchor
if digest:
facts["digest_title"] = digest.get("title", "")
facts["digest_source"] = digest.get("source", "")
facts["digest_trial_n"] = digest.get("trial_n", "")
# Trigger metadata
facts["trigger_kind"] = trigger_ctx.kind
# Customer identity (for customer-facing sends)
if customer_ctx and customer_ctx.identity:
facts["customer_name"] = customer_ctx.identity.name or "there"
return facts
def _build_llm_prompt(
lever: str,
language_pref: str,
cta: str,
send_as: str,
facts: dict[str, Any],
draft_body: str,
draft_rationale: str,
) -> str:
lever_template = LEVER_TEMPLATES.get(lever, LEVER_TEMPLATES["neutral"])
language_instruction = (
"Language: Hindi-English code-mix. "
"Blend naturally — use Hindi particles (ji, hai, karo) "
"with English nouns and numbers.\n"
if language_pref.startswith("hi")
else "Language: English.\n"
)
# Build conversation block BEFORE the f-string
recent = facts.pop("recent_conversation", None)
if recent:
history_lines = "\n".join(f" [{e['from']}]: {e['body']}" for e in recent)
last_message = recent[-1]["body"] if recent[-1]["from"] == "merchant" else ""
conversation_block = (
f"\nConversation so far:\n{history_lines}\n"
f"\nThe merchant's last message was: \"{last_message}\"\n"
"Respond directly to what they said, staying in character as Vera.\n"
)
else:
conversation_block = ""
# Build facts block AFTER popping recent_conversation
facts_block = "\n".join(f"- {key}: {value}" for key, value in facts.items())
return f"""{SYSTEM_PROMPT}
{lever_template}
{conversation_block}
{language_instruction}
Constraints:
- CTA mode: {cta}
- Send as: {send_as}
Extracted facts (use ONLY these):
{facts_block}
Draft body from rule engine: "{draft_body}"
Draft rationale: "{draft_rationale}"
Refine the draft body using the lever template and facts above.
Return a JSON object with keys: body, cta, send_as, rationale."""
async def compose_async(
category: dict[str, Any],
merchant: dict[str, Any],
trigger: dict[str, Any],
customer: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Compose a message payload using async LLM calls."""
category_ctx: CategoryContext = _validate(CategoryContext, category)
merchant_ctx: MerchantContext = _validate(MerchantContext, merchant)
trigger_ctx: TriggerContext = _validate(TriggerContext, trigger)
customer_ctx: CustomerContext | None = (
_validate(CustomerContext, customer) if customer else None
)
if merchant_ctx.category_slug != category_ctx.slug:
raise ValueError("Category slug mismatch between merchant and category contexts.")
_context_version_check(merchant_ctx, trigger_ctx)
send_as = _determine_send_as(trigger_ctx, customer_ctx)
language_pref = _language_pref(merchant_ctx, customer_ctx)
merchant_name = None
if merchant_ctx.identity:
merchant_name = merchant_ctx.identity.owner_first_name or merchant_ctx.identity.name
merchant_name = merchant_name or "there"
conversation_history = getattr(merchant_ctx, "conversation_history", None)
last_merchant_message = _last_merchant_message(conversation_history)
strategy = "fallback"
lever = "neutral"
benchmark: dict[str, str] = {}
digest: dict[str, str] | None = None
if await _auto_reply_detected_async(conversation_history):
strategy = "auto_reply_exit"
if language_pref.startswith("hi"):
body = (
"Hi, lagta hai yeh auto-reply hai ji. "
"Aapke owner/manager se main direct connect kar leti hoon."
)
else:
body = (
"Hi, this looks like an auto-reply. "
"I'll connect directly with the owner or manager."
)
cta = "none"
elif await _intent_transition_detected_async(last_merchant_message):
strategy = "intent_transition"
if language_pref.startswith("hi"):
body = (
"Great, main aapka onboarding start kar sakti hoon. "
"Proceed karne ke liye YES bol dijiye, STOP for later. STOP"
)
else:
body = (
"Great, I can start your onboarding now. "
"Reply YES to proceed or STOP for later. STOP"
)
cta = "yes_no"
elif send_as == "merchant_on_behalf" and customer_ctx and customer_ctx.identity:
strategy = "customer_facing"
customer_name = customer_ctx.identity.name or "there"
if language_pref.startswith("hi"):
body = (
f"Namaste {customer_name}, {merchant_name} clinic se. "
"Context update ho gaya hai. Aap kab baat karna chaahenge?"
)
else:
body = (
f"Hi {customer_name}, this is {merchant_name}. "
"Context is updated. When would you like to continue?"
)
cta = "open_ended"
else:
benchmark = _benchmark_facts(merchant_ctx, category_ctx)
digest = _research_digest_anchor(trigger_ctx, category_ctx)
lever = _select_compulsion_lever(trigger_ctx.kind)
if digest and digest.get("title"):
strategy = "digest_anchor"
if language_pref.startswith("hi"):
body = (
f"{merchant_name}, naya research digest aaya hai: "
f"{digest['title']}. "
f"Source: {digest.get('source', 'N/A')}. "
"Aap chahen to main 2-min summary bhej du?"
)
else:
body = (
f"{merchant_name}, new research digest: {digest['title']}. "
f"Source: {digest.get('source', 'N/A')}. "
"Want a 2-minute summary?"
)
cta = "open_ended"
elif benchmark:
strategy = "benchmark_anchor"
ctr_gap = benchmark.get("ctr_gap")
views = benchmark.get("views")
if language_pref.startswith("hi"):
body = (
f"{merchant_name}, aapke {views or 'latest'} me CTR "
f"{ctr_gap or benchmark.get('ctr', 'N/A')} hai. "
"Chahein to main quick improvement plan bheju?"
)
else:
body = (
f"{merchant_name}, your {views or 'latest'} CTR is "
f"{ctr_gap or benchmark.get('ctr', 'N/A')}. "
"Want a quick improvement plan?"
)
cta = "open_ended"
elif language_pref.startswith("hi"):
body = (
f"Namaste {merchant_name}, context update ho gaya hai. "
"Jab aap ready ho, bataiye."
)
cta = "open_ended"
else:
body = (
f"Hi {merchant_name}, context is updated. "
"Let me know when you're ready to continue."
)
cta = "open_ended"
if cta == "open_ended":
body = _apply_compulsion_lever(body, lever, language_pref)
body = _apply_voice_modulation(category_ctx, body)
# Binary CTA enforcement for action-oriented triggers
if _is_action_trigger(trigger_ctx.kind):
body = _enforce_binary_cta(body, language_pref)
cta = "yes_no"
rationale = _build_rationale(
strategy=strategy,
lever=lever,
category_slug=category_ctx.slug,
trigger_kind=trigger_ctx.kind,
)
message_dict = {
"body": body,
"cta": cta,
"send_as": send_as,
"suppression_key": trigger_ctx.suppression_key,
"rationale": rationale,
}
if COMPOSER_CHAIN.is_available():
jit_facts = _extract_jit_facts(
merchant_ctx=merchant_ctx,
category_ctx=category_ctx,
trigger_ctx=trigger_ctx,
customer_ctx=customer_ctx,
benchmark=benchmark,
digest=digest,
)
prompt = _build_llm_prompt(
lever=lever,
language_pref=language_pref,
cta=cta,
send_as=send_as,
facts=jit_facts,
draft_body=body,
draft_rationale=rationale,
)
try:
response, model_used = await COMPOSER_CHAIN.generate_content(
contents=prompt,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=ComposedMessage,
temperature=0.0,
),
)
logger.debug("Composer used model %s", model_used)
if response.text:
llm_dict = json.loads(response.text)
llm_dict["suppression_key"] = trigger_ctx.suppression_key
message_dict = llm_dict
except Exception:
logger.warning(
"LLM refinement failed across all keys/models; "
"falling back to deterministic draft",
exc_info=True,
)
# Anti-repetition: check if the body is a verbatim repeat
is_repeat = _check_suppression_dedup(
trigger_ctx.suppression_key,
str(message_dict["body"]),
)
if is_repeat:
message_dict["rationale"] = (
f"[SUPPRESSED REPEAT] {message_dict['rationale']}"
)
message = ComposedMessage.model_validate(
message_dict,
context={
"category": category_ctx,
"merchant": merchant_ctx,
"customer": customer_ctx,
"trigger": trigger_ctx,
"language_pref": language_pref,
},
)
return message.model_dump()
def compose(
category: dict[str, Any],
merchant: dict[str, Any],
trigger: dict[str, Any],
customer: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Sync wrapper around compose_async for compatibility with tests."""
return anyio.run(compose_async, category, merchant, trigger, customer)
# =============================================================================
# Judge Simulator API (FastAPI)
# =============================================================================
START_TIME = time.time()
_context_store: dict[str, dict[str, Any]] = {
"category": {},
"merchant": {},
"customer": {},
"trigger": {},
}
VALID_SCOPES: frozenset[str] = frozenset({"category", "merchant", "customer", "trigger"})
class ContextPush(BaseModel):
"""Schema for /v1/context payloads pushed by the judge."""
scope: str
context_id: str
version: int
payload: dict[str, Any]
delivered_at: str
def _validate_scope(scope: str) -> bool:
"""Return True if scope is one of the supported context scopes."""
return scope in VALID_SCOPES
def _utc_now_iso() -> str:
"""Return the current UTC time as a Z-suffixed ISO-8601 string."""
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _sse_event(event: str, data: Any) -> str:
"""Format a Server-Sent Event message with JSON payload."""
payload = json.dumps(data, ensure_ascii=False)
return f"event: {event}\ndata: {payload}\n\n"
def _stream_single(event: str, data: Any) -> StreamingResponse:
"""Stream a single SSE event (used for simple endpoints)."""
async def generator() -> AsyncIterator[str]:
yield _sse_event(event, data)