Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions reference/identity-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"encoding/hex"
"fmt"
"math"
"strconv"
"time"

"github.com/adcontextprotocol/adcp-go/tmp"
"github.com/redis/go-redis/v9"
)

// FrequencyRule defines a sliding window frequency cap.
Expand All @@ -33,15 +33,15 @@ type CampaignConfig struct {
FrequencyRules []FrequencyRule // Campaign-level caps (all must pass)
}

// IdentityAgent evaluates user eligibility using Valkey/Redis.
// IdentityAgent evaluates user eligibility using a Store (Redis or in-memory).
type IdentityAgent struct {
rdb *redis.Client
store Store
packages map[string]PackageConfig
campaigns map[string]CampaignConfig
}

// NewIdentityAgent creates an agent with the given Redis client and configs.
func NewIdentityAgent(rdb *redis.Client, packages []PackageConfig, campaigns []CampaignConfig) *IdentityAgent {
// NewIdentityAgent creates an agent with the given store and configs.
func NewIdentityAgent(store Store, packages []PackageConfig, campaigns []CampaignConfig) *IdentityAgent {
pkgMap := make(map[string]PackageConfig, len(packages))
for _, p := range packages {
pkgMap[p.PackageID] = p
Expand All @@ -50,7 +50,7 @@ func NewIdentityAgent(rdb *redis.Client, packages []PackageConfig, campaigns []C
for _, c := range campaigns {
campMap[c.CampaignID] = c
}
return &IdentityAgent{rdb: rdb, packages: pkgMap, campaigns: campMap}
return &IdentityAgent{store: store, packages: pkgMap, campaigns: campMap}
}

// IdentityMatch evaluates a user against all requested packages.
Expand Down Expand Up @@ -130,8 +130,7 @@ func (a *IdentityAgent) IdentityMatch(ctx context.Context, req *tmp.IdentityMatc
}

// Expose records that a user was shown an ad for a package.
// Adds a timestamped entry to sorted sets for both package and campaign frequency.
// Uses sorted sets for sliding window frequency capping.
// Uses pipeline to batch Redis commands for efficiency.
func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tmp.ExposeResponse, error) {
tokenHash := hashToken(req.UserToken)
pkg, ok := a.packages[req.PackageID]
Expand All @@ -141,14 +140,13 @@ func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tm

now := time.Now()
ts := float64(now.UnixMilli())
member := fmt.Sprintf("%d:%s", now.UnixNano(), req.PackageID) // Unique per exposure
member := fmt.Sprintf("%d:%s", now.UnixNano(), req.PackageID)

pipe := a.rdb.Pipeline()
pipe := a.store.Pipeline(ctx)

// Add to package-level sorted set
pkgKey := fmt.Sprintf("freq:pkg:%s:%s", req.PackageID, tokenHash)
pipe.ZAdd(ctx, pkgKey, redis.Z{Score: ts, Member: member})
// Set TTL to longest window + buffer to auto-cleanup
pipe.ZAdd(ctx, pkgKey, ts, member)
if len(pkg.FrequencyRules) > 0 {
maxWindow := maxRuleWindow(pkg.FrequencyRules)
pipe.Expire(ctx, pkgKey, maxWindow+time.Hour)
Expand All @@ -163,7 +161,7 @@ func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tm
var campKey string
if campaignID != "" {
campKey = fmt.Sprintf("freq:campaign:%s:%s", campaignID, tokenHash)
pipe.ZAdd(ctx, campKey, redis.Z{Score: ts, Member: member})
pipe.ZAdd(ctx, campKey, ts, member)
if camp, ok := a.campaigns[campaignID]; ok && len(camp.FrequencyRules) > 0 {
maxWindow := maxRuleWindow(camp.FrequencyRules)
pipe.Expire(ctx, campKey, maxWindow+time.Hour)
Expand All @@ -174,8 +172,7 @@ func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tm
intentKey := fmt.Sprintf("intent:%s:%s", req.PackageID, tokenHash)
pipe.Set(ctx, intentKey, now.Unix(), 7*24*time.Hour)

_, err := pipe.Exec(ctx)
if err != nil {
if err := pipe.Exec(ctx); err != nil {
return nil, err
}

Expand All @@ -186,7 +183,7 @@ func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tm
if camp, ok := a.campaigns[campaignID]; ok && len(camp.FrequencyRules) > 0 {
shortestRule := camp.FrequencyRules[0]
cutoff := float64(now.Add(-shortestRule.Window).UnixMilli())
count, _ := a.rdb.ZCount(ctx, campKey, fmt.Sprintf("%f", cutoff), "+inf").Result()
count, _ := a.store.ZCount(ctx, campKey, cutoff, 1e18)
resp.CampaignCount = int(count)
resp.CampaignRemaining = shortestRule.MaxCount - int(count)
if resp.CampaignRemaining < 0 {
Expand All @@ -200,13 +197,12 @@ func (a *IdentityAgent) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tm

// checkFrequencyRules checks all frequency rules against a sorted set.
// Returns true (capped) if ANY rule is exceeded.
// Each rule is a sliding window: count entries within [now-window, now].
func (a *IdentityAgent) checkFrequencyRules(ctx context.Context, key string, rules []FrequencyRule) (bool, error) {
now := time.Now()
for _, rule := range rules {
cutoff := float64(now.Add(-rule.Window).UnixMilli())
count, err := a.rdb.ZCount(ctx, key, fmt.Sprintf("%f", cutoff), "+inf").Result()
if err != nil && err != redis.Nil {
count, err := a.store.ZCount(ctx, key, cutoff, 1e18)
if err != nil {
return false, err
}
if int(count) >= rule.MaxCount {
Expand All @@ -229,7 +225,7 @@ func maxRuleWindow(rules []FrequencyRule) time.Duration {
func (a *IdentityAgent) checkAudienceMatch(ctx context.Context, tokenHash string, segments []string) (bool, error) {
for _, seg := range segments {
key := fmt.Sprintf("audience:%s", seg)
member, err := a.rdb.SIsMember(ctx, key, tokenHash).Result()
member, err := a.store.SIsMember(ctx, key, tokenHash)
if err != nil {
return false, err
}
Expand All @@ -242,12 +238,16 @@ func (a *IdentityAgent) checkAudienceMatch(ctx context.Context, tokenHash string

func (a *IdentityAgent) computeIntentScore(ctx context.Context, tokenHash, packageID string) (float64, error) {
key := fmt.Sprintf("intent:%s:%s", packageID, tokenHash)
ts, err := a.rdb.Get(ctx, key).Int64()
if err == redis.Nil {
val, err := a.store.Get(ctx, key)
if err != nil {
return 0, err
}
if val == "" {
return 0, nil
}
ts, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return 0, err
return 0, nil
}
hoursSince := time.Since(time.Unix(ts, 0)).Hours()
score := 1.0 - (hoursSince / 168.0)
Expand All @@ -263,7 +263,7 @@ func (a *IdentityAgent) LoadAudienceSegment(ctx context.Context, segmentID strin
for i, tok := range userTokens {
members[i] = hashToken(tok)
}
return a.rdb.SAdd(ctx, key, members...).Err()
return a.store.SAdd(ctx, key, members...)
}

func hashToken(token string) string {
Expand Down
103 changes: 80 additions & 23 deletions reference/identity-agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func setupTest(t *testing.T) (*IdentityAgent, *miniredis.Miniredis) {
}

rdb := redis.NewClient(&redis.Options{Addr: mr.Addr()})
agent := NewIdentityAgent(rdb,
store := NewRedisStore(rdb)
agent := NewIdentityAgent(store,
[]PackageConfig{
{
PackageID: "pkg-display-001",
Expand All @@ -39,8 +40,8 @@ func setupTest(t *testing.T) (*IdentityAgent, *miniredis.Miniredis) {
PackageID: "pkg-multi-rule",
CampaignID: "campaign-acme",
FrequencyRules: []FrequencyRule{
{MaxCount: 2, Window: 12 * time.Hour}, // 2 per 12h
{MaxCount: 5, Window: 7 * 24 * time.Hour}, // AND 5 per week
{MaxCount: 2, Window: 12 * time.Hour},
{MaxCount: 5, Window: 7 * 24 * time.Hour},
},
},
{
Expand Down Expand Up @@ -86,14 +87,13 @@ func TestExpose_CampaignFrequencyCap(t *testing.T) {
defer mr.Close()
ctx := context.Background()

agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})
_ = agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})

// 5 exposures across two packages in campaign-acme (campaign cap is 5)
for i := 0; i < 3; i++ {
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
}
for i := 0; i < 2; i++ {
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-002"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-002"})
}

resp, err := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
Expand All @@ -117,11 +117,10 @@ func TestExpose_PackageCappedButCampaignNot(t *testing.T) {
defer mr.Close()
ctx := context.Background()

agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})
_ = agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})

// 3 exposures on pkg-display-001 (package cap=3, campaign cap=5)
for i := 0; i < 3; i++ {
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
}

resp, err := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
Expand Down Expand Up @@ -151,10 +150,8 @@ func TestMultipleFrequencyRules(t *testing.T) {
defer mr.Close()
ctx := context.Background()

// pkg-multi-rule: 2 per 12h AND 5 per 7d
// Expose 2 times — should hit the 12h cap
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-multi-rule"})
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-multi-rule"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-multi-rule"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-multi-rule"})

resp, err := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "id-test-multi",
Expand All @@ -175,26 +172,21 @@ func TestSlidingWindow_OldExposuresExpire(t *testing.T) {
defer mr.Close()
ctx := context.Background()

agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})
_ = agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})

// Expose 3 times (hits package cap of 3 per 24h)
for i := 0; i < 3; i++ {
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
}

// Should be capped now
resp, _ := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "id-before", UserToken: "user-abc", PackageIDs: []string{"pkg-display-001"},
})
if resp.Eligibility[0].Eligible {
t.Error("should be capped (3/3 in 24h)")
}

// Fast-forward miniredis by 25 hours — exposures fall outside the 24h window
mr.FastForward(25 * time.Hour)

// The sorted set entries still exist but their timestamps are now >24h old.
// ZCOUNT with the sliding window cutoff should return 0.
resp, _ = agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "id-after", UserToken: "user-abc", PackageIDs: []string{"pkg-display-001"},
})
Expand All @@ -208,8 +200,8 @@ func TestExpose_IntentScoreUpdated(t *testing.T) {
defer mr.Close()
ctx := context.Background()

agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})
agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})
_ = agent.LoadAudienceSegment(ctx, "cooking", []string{"user-abc"})
_, _ = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-abc", PackageID: "pkg-display-001"})

resp, err := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "id-intent", UserToken: "user-abc", PackageIDs: []string{"pkg-display-001"},
Expand Down Expand Up @@ -264,3 +256,68 @@ func TestUnknownPackage(t *testing.T) {
t.Error("unknown package should not be eligible")
}
}

// --- In-Memory Store Tests ---

func TestInMemoryStore_FullFlow(t *testing.T) {
store := NewInMemoryStore()
agent := NewIdentityAgent(store,
[]PackageConfig{
{PackageID: "pkg-1", CampaignID: "camp-1", FrequencyRules: []FrequencyRule{{MaxCount: 2, Window: time.Hour}}},
},
[]CampaignConfig{
{CampaignID: "camp-1", FrequencyRules: []FrequencyRule{{MaxCount: 3, Window: 24 * time.Hour}}},
},
)
ctx := context.Background()

// Two exposures should work
_, err := agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-1", PackageID: "pkg-1"})
if err != nil {
t.Fatal(err)
}
_, err = agent.Expose(ctx, &tmp.ExposeRequest{UserToken: "user-1", PackageID: "pkg-1"})
if err != nil {
t.Fatal(err)
}

// Should now be capped
resp, err := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "test", UserToken: "user-1", PackageIDs: []string{"pkg-1"},
})
if err != nil {
t.Fatal(err)
}
if resp.Eligibility[0].Eligible {
t.Error("should be capped after 2 exposures (in-memory store)")
}
}

func TestInMemoryStore_AudienceSegments(t *testing.T) {
store := NewInMemoryStore()
agent := NewIdentityAgent(store,
[]PackageConfig{
{PackageID: "pkg-1", TargetSegments: []string{"vip"}},
},
nil,
)
ctx := context.Background()

// Not in segment
resp, _ := agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "t1", UserToken: "user-1", PackageIDs: []string{"pkg-1"},
})
if resp.Eligibility[0].Eligible {
t.Error("should not be eligible (not in segment)")
}

// Load segment
_ = agent.LoadAudienceSegment(ctx, "vip", []string{"user-1"})

resp, _ = agent.IdentityMatch(ctx, &tmp.IdentityMatchRequest{
RequestID: "t2", UserToken: "user-1", PackageIDs: []string{"pkg-1"},
})
if !resp.Eligibility[0].Eligible {
t.Error("should be eligible after segment load")
}
}
Loading