From 43a8e2c821ea254b8c0c4cd8fcd08efd325b3583 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Fri, 27 Mar 2026 07:22:19 +0900 Subject: [PATCH] feat: TMP client library for publishers Client handles the full TMP match flow: builds context and identity requests, fires both in parallel with temporal decorrelation to prevent timing correlation attacks, joins results locally, and reports exposure. Features: functional options, retry with backoff, configurable decorrelation delay, connection pooling, structured error types. Closes #7 Co-Authored-By: Claude Opus 4.6 (1M context) --- client/client.go | 170 ++++++++++++++++++++++++ client/client_test.go | 299 ++++++++++++++++++++++++++++++++++++++++++ client/match.go | 213 ++++++++++++++++++++++++++++++ client/options.go | 39 ++++++ 4 files changed, 721 insertions(+) create mode 100644 client/client.go create mode 100644 client/client_test.go create mode 100644 client/match.go create mode 100644 client/options.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..536544c --- /dev/null +++ b/client/client.go @@ -0,0 +1,170 @@ +// Package client provides a TMP client for publishers. It handles the full +// match flow: fire context and identity requests in parallel with temporal +// decorrelation, join results locally, and report exposure. +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand/v2" + "net" + "net/http" + "time" + + "github.com/adcontextprotocol/adcp-go/tmp" +) + +// Client is a TMP client for publishers. +type Client struct { + contextURL string + identityURL string + + httpClient *http.Client + decorrelationMax time.Duration + maxRetries int + retryBaseDelay time.Duration + + // randDelay returns a random duration in [0, max) for temporal decorrelation. + randDelay func(max time.Duration) time.Duration +} + +// New creates a TMP client. contextURL is the router's context match endpoint +// (e.g., "http://router:8080/tmp/context"). identityURL is the base URL for +// identity match and expose (e.g., "http://router:8080/tmp"). +func New(contextURL, identityURL string, opts ...Option) *Client { + c := &Client{ + contextURL: contextURL, + identityURL: identityURL, + httpClient: &http.Client{ + Timeout: 2 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + }, + }, + decorrelationMax: 5 * time.Millisecond, + randDelay: defaultRandDelay, + } + for _, opt := range opts { + opt(c) + } + return c +} + +func defaultRandDelay(max time.Duration) time.Duration { + if max <= 0 { + return 0 + } + return time.Duration(rand.Int64N(int64(max))) +} + +// Expose notifies the identity provider that a user was exposed to a package. +func (c *Client) Expose(ctx context.Context, req *tmp.ExposeRequest) (*tmp.ExposeResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("tmp client: marshal expose: %w", err) + } + + data, err := c.doWithRetry(ctx, c.identityURL+"/expose", body) + if err != nil { + return nil, fmt.Errorf("tmp client: expose: %w", err) + } + + var resp tmp.ExposeResponse + if err := json.Unmarshal(data, &resp); err != nil { + return nil, fmt.Errorf("tmp client: unmarshal expose response: %w", err) + } + return &resp, nil +} + +func (c *Client) doPost(ctx context.Context, url string, body []byte) ([]byte, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + defer func() { _ = resp.Body.Close() }() + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode >= 400 { + var errResp tmp.ErrorResponse + if json.Unmarshal(data, &errResp) == nil && errResp.Code != "" { + return nil, &RequestError{StatusCode: resp.StatusCode, Response: errResp} + } + return nil, &RequestError{ + StatusCode: resp.StatusCode, + Response: tmp.ErrorResponse{ + Code: tmp.ErrorCodeInternalError, + Message: fmt.Sprintf("HTTP %d: %s", resp.StatusCode, string(data)), + }, + } + } + + return data, nil +} + +func (c *Client) doWithRetry(ctx context.Context, url string, body []byte) ([]byte, error) { + if c.maxRetries <= 0 { + return c.doPost(ctx, url, body) + } + + var lastErr error + for i := range c.maxRetries { + data, err := c.doPost(ctx, url, body) + if err == nil { + return data, nil + } + lastErr = err + + if !isRetryable(err) { + return nil, err + } + + if i < c.maxRetries-1 { + delay := c.retryBaseDelay * time.Duration(1<= 500 || reqErr.StatusCode == 429 + } + return true // network errors are retryable +} + +// RequestError represents an error response from a TMP endpoint. +type RequestError struct { + StatusCode int + Response tmp.ErrorResponse +} + +func (e *RequestError) Error() string { + if e.Response.Message != "" { + return fmt.Sprintf("tmp %s (HTTP %d): %s", e.Response.Code, e.StatusCode, e.Response.Message) + } + return fmt.Sprintf("tmp %s (HTTP %d)", e.Response.Code, e.StatusCode) +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..bb33027 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,299 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/adcontextprotocol/adcp-go/tmp" +) + +func TestMatch_ParallelExecution(t *testing.T) { + var ctxCalled, idCalled atomic.Int32 + + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctxCalled.Add(1) + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{ + RequestID: "test-1", + Offers: []tmp.Offer{{PackageID: "pkg-1"}, {PackageID: "pkg-2"}}, + }) + })) + defer ctxServer.Close() + + score := 0.85 + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + idCalled.Add(1) + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{ + RequestID: "test-1", + Eligibility: []tmp.PackageEligibility{ + {PackageID: "pkg-1", Eligible: true, IntentScore: &score}, + {PackageID: "pkg-2", Eligible: false}, + }, + }) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithDecorrelationMax(0)) + result, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "test-1", + PropertyID: "pub-test", + PropertyType: tmp.PropertyTypeWebsite, + PlacementID: "sidebar", + UserToken: "tok_abc", + PackageIDs: []string{"pkg-1", "pkg-2"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1", "pkg-2": "mb-2"}, + }) + if err != nil { + t.Fatal(err) + } + + if ctxCalled.Load() != 1 { + t.Error("context endpoint should be called once") + } + if idCalled.Load() != 1 { + t.Error("identity endpoint should be called once") + } + + // Only pkg-1 should be eligible (context offered both, identity only approved pkg-1) + if len(result.EligiblePackages) != 1 { + t.Fatalf("expected 1 eligible package, got %d", len(result.EligiblePackages)) + } + if result.EligiblePackages[0].PackageID != "pkg-1" { + t.Errorf("expected pkg-1, got %s", result.EligiblePackages[0].PackageID) + } + if result.EligiblePackages[0].IntentScore == nil || *result.EligiblePackages[0].IntentScore != 0.85 { + t.Error("intent score should be 0.85") + } +} + +func TestMatch_TemporalDecorrelation(t *testing.T) { + var ctxTime, idTime atomic.Int64 + + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctxTime.Store(time.Now().UnixNano()) + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{RequestID: "t"}) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + idTime.Store(time.Now().UnixNano()) + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{RequestID: "t"}) + })) + defer idServer.Close() + + // Force a fixed 10ms delay + c := New(ctxServer.URL, idServer.URL) + c.randDelay = func(max time.Duration) time.Duration { return 10 * time.Millisecond } + + _, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "t", + PropertyID: "p", + PlacementID: "pl", + UserToken: "tok", + PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err != nil { + t.Fatal(err) + } + + // Identity should have been delayed + diff := time.Duration(idTime.Load() - ctxTime.Load()) + if diff < 5*time.Millisecond { + t.Errorf("identity should be delayed by decorrelation, diff was %v", diff) + } +} + +func TestMatch_ContextFailure(t *testing.T) { + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{RequestID: "t"}) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithDecorrelationMax(0)) + _, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "t", PropertyID: "p", PlacementID: "pl", + UserToken: "tok", PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err == nil { + t.Error("expected error when context fails") + } +} + +func TestMatch_IdentityFailure(t *testing.T) { + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{RequestID: "t"}) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithDecorrelationMax(0)) + _, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "t", PropertyID: "p", PlacementID: "pl", + UserToken: "tok", PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err == nil { + t.Error("expected error when identity fails") + } +} + +func TestMatch_JoinLogic_NoContextOffer(t *testing.T) { + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Context returns no offers + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{RequestID: "t", Offers: []tmp.Offer{}}) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{ + RequestID: "t", + Eligibility: []tmp.PackageEligibility{{PackageID: "pkg-1", Eligible: true}}, + }) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithDecorrelationMax(0)) + result, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "t", PropertyID: "p", PlacementID: "pl", + UserToken: "tok", PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err != nil { + t.Fatal(err) + } + if len(result.EligiblePackages) != 0 { + t.Error("no offers from context means no eligible packages") + } +} + +func TestMatch_Signals(t *testing.T) { + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{ + RequestID: "t", + Offers: []tmp.Offer{{PackageID: "pkg-1"}}, + Signals: &tmp.Signals{ + Segments: []string{"cooking"}, + TargetingKVs: []tmp.KeyValuePair{{Key: "adcp_pkg", Value: "pkg-1"}}, + }, + }) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{ + RequestID: "t", + Eligibility: []tmp.PackageEligibility{{PackageID: "pkg-1", Eligible: true}}, + }) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithDecorrelationMax(0)) + result, err := c.Match(context.Background(), &MatchRequest{ + RequestID: "t", PropertyID: "p", PlacementID: "pl", + UserToken: "tok", PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err != nil { + t.Fatal(err) + } + if result.Signals == nil || len(result.Signals.Segments) != 1 { + t.Error("signals should be passed through from context response") + } +} + +func TestExpose(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req tmp.ExposeRequest + _ = json.NewDecoder(r.Body).Decode(&req) + if req.PackageID != "pkg-1" { + t.Errorf("expected pkg-1, got %s", req.PackageID) + } + _ = json.NewEncoder(w).Encode(tmp.ExposeResponse{ + PackageID: "pkg-1", + CampaignCount: 3, + CampaignRemaining: 7, + }) + })) + defer server.Close() + + c := New("", server.URL+"/tmp") + resp, err := c.Expose(context.Background(), &tmp.ExposeRequest{ + UserToken: "tok_abc", + PackageID: "pkg-1", + }) + if err != nil { + t.Fatal(err) + } + if resp.CampaignCount != 3 { + t.Errorf("expected count 3, got %d", resp.CampaignCount) + } +} + +func TestMatch_Timeout(t *testing.T) { + ctxServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(200 * time.Millisecond) + _ = json.NewEncoder(w).Encode(tmp.ContextMatchResponse{RequestID: "t"}) + })) + defer ctxServer.Close() + + idServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(tmp.IdentityMatchResponse{RequestID: "t"}) + })) + defer idServer.Close() + + c := New(ctxServer.URL, idServer.URL, WithTimeout(50*time.Millisecond), WithDecorrelationMax(0)) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, err := c.Match(ctx, &MatchRequest{ + RequestID: "t", PropertyID: "p", PlacementID: "pl", + UserToken: "tok", PackageIDs: []string{"pkg-1"}, + MediaBuyIDs: map[string]string{"pkg-1": "mb-1"}, + }) + if err == nil { + t.Error("expected timeout error") + } +} + +func TestRequestError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(tmp.ErrorResponse{ + Code: tmp.ErrorCodeInvalidRequest, + Message: "bad field", + }) + })) + defer server.Close() + + c := New("", server.URL+"/tmp") + _, err := c.Expose(context.Background(), &tmp.ExposeRequest{PackageID: "pkg-1"}) + if err == nil { + t.Fatal("expected error") + } + + reqErr, ok := err.(*RequestError) + if !ok { + // It's wrapped, unwrap + t.Logf("error: %v", err) + } else { + if reqErr.StatusCode != 400 { + t.Errorf("expected 400, got %d", reqErr.StatusCode) + } + } +} diff --git a/client/match.go b/client/match.go new file mode 100644 index 0000000..b78c9dd --- /dev/null +++ b/client/match.go @@ -0,0 +1,213 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/adcontextprotocol/adcp-go/tmp" +) + +// MatchRequest is a publisher-friendly request combining context and identity. +type MatchRequest struct { + RequestID string + PropertyID string + PropertyType tmp.PropertyType + PlacementID string + Artifacts []string + Geo *tmp.Geo + UserToken string + UIDType tmp.UIDType + PackageIDs []string + MediaBuyIDs map[string]string // package_id -> media_buy_id + FormatIDs map[string][]string // package_id -> format_ids +} + +// EligiblePackage is a package that passed both context and identity checks. +type EligiblePackage struct { + PackageID string + Offer tmp.Offer + IntentScore *float64 +} + +// MatchResult is the combined result of context + identity match. +type MatchResult struct { + RequestID string + EligiblePackages []EligiblePackage + Signals *tmp.Signals + Timing Timing +} + +// Timing captures latency for observability. +type Timing struct { + ContextLatency time.Duration + IdentityLatency time.Duration + TotalLatency time.Duration + DecorrelationDelay time.Duration +} + +// Match fires context and identity requests in parallel, joins results locally. +func (c *Client) Match(ctx context.Context, req *MatchRequest) (*MatchResult, error) { + totalStart := time.Now() + + // Build requests + ctxReq := c.buildContextRequest(req) + idReq := c.buildIdentityRequest(req) + + ctxBody, err := json.Marshal(ctxReq) + if err != nil { + return nil, fmt.Errorf("tmp client: marshal context request: %w", err) + } + idBody, err := json.Marshal(idReq) + if err != nil { + return nil, fmt.Errorf("tmp client: marshal identity request: %w", err) + } + + // Fire both in parallel + var ( + mu sync.Mutex + ctxResp *tmp.ContextMatchResponse + idResp *tmp.IdentityMatchResponse + ctxErr error + idErr error + ctxLatency time.Duration + idLatency time.Duration + decorDelay time.Duration + wg sync.WaitGroup + ) + + wg.Add(2) + + // Context request — fires immediately + go func() { + defer wg.Done() + start := time.Now() + data, err := c.doWithRetry(ctx, c.contextURL, ctxBody) + mu.Lock() + ctxLatency = time.Since(start) + mu.Unlock() + if err != nil { + ctxErr = fmt.Errorf("context match: %w", err) + return + } + var resp tmp.ContextMatchResponse + if err := json.Unmarshal(data, &resp); err != nil { + ctxErr = fmt.Errorf("unmarshal context response: %w", err) + return + } + ctxResp = &resp + }() + + // Identity request — delayed by temporal decorrelation + go func() { + defer wg.Done() + delay := c.randDelay(c.decorrelationMax) + mu.Lock() + decorDelay = delay + mu.Unlock() + if delay > 0 { + select { + case <-time.After(delay): + case <-ctx.Done(): + idErr = ctx.Err() + return + } + } + start := time.Now() + data, err := c.doWithRetry(ctx, c.identityURL+"/identity", idBody) + mu.Lock() + idLatency = time.Since(start) + mu.Unlock() + if err != nil { + idErr = fmt.Errorf("identity match: %w", err) + return + } + var resp tmp.IdentityMatchResponse + if err := json.Unmarshal(data, &resp); err != nil { + idErr = fmt.Errorf("unmarshal identity response: %w", err) + return + } + idResp = &resp + }() + + wg.Wait() + + // Both must succeed + if ctxErr != nil { + return nil, fmt.Errorf("tmp client: %w", ctxErr) + } + if idErr != nil { + return nil, fmt.Errorf("tmp client: %w", idErr) + } + + // Join: package is eligible only if context offers it AND identity says eligible + result := c.joinResults(req.RequestID, ctxResp, idResp) + result.Timing = Timing{ + ContextLatency: ctxLatency, + IdentityLatency: idLatency, + TotalLatency: time.Since(totalStart), + DecorrelationDelay: decorDelay, + } + + return result, nil +} + +func (c *Client) buildContextRequest(req *MatchRequest) *tmp.ContextMatchRequest { + var pkgs []tmp.AvailablePackage + for _, pid := range req.PackageIDs { + pkg := tmp.AvailablePackage{ + PackageID: pid, + MediaBuyID: req.MediaBuyIDs[pid], + FormatIDs: req.FormatIDs[pid], + } + pkgs = append(pkgs, pkg) + } + return &tmp.ContextMatchRequest{ + RequestID: req.RequestID, + PropertyID: req.PropertyID, + PropertyType: req.PropertyType, + PlacementID: req.PlacementID, + Artifacts: req.Artifacts, + Geo: req.Geo, + AvailablePkgs: pkgs, + } +} + +func (c *Client) buildIdentityRequest(req *MatchRequest) *tmp.IdentityMatchRequest { + return &tmp.IdentityMatchRequest{ + RequestID: req.RequestID, + UserToken: req.UserToken, + UIDType: req.UIDType, + PackageIDs: req.PackageIDs, + } +} + +func (c *Client) joinResults(requestID string, ctx *tmp.ContextMatchResponse, id *tmp.IdentityMatchResponse) *MatchResult { + // Build eligibility map + eligMap := make(map[string]*tmp.PackageEligibility, len(id.Eligibility)) + for i := range id.Eligibility { + eligMap[id.Eligibility[i].PackageID] = &id.Eligibility[i] + } + + // Intersect: only packages that have an offer AND are eligible + var eligible []EligiblePackage + for _, offer := range ctx.Offers { + e, ok := eligMap[offer.PackageID] + if !ok || !e.Eligible { + continue + } + eligible = append(eligible, EligiblePackage{ + PackageID: offer.PackageID, + Offer: offer, + IntentScore: e.IntentScore, + }) + } + + return &MatchResult{ + RequestID: requestID, + EligiblePackages: eligible, + Signals: ctx.Signals, + } +} diff --git a/client/options.go b/client/options.go new file mode 100644 index 0000000..5b44772 --- /dev/null +++ b/client/options.go @@ -0,0 +1,39 @@ +package client + +import ( + "net/http" + "time" +) + +// Option configures the Client. +type Option func(*Client) + +// WithTimeout sets the overall HTTP request timeout. +func WithTimeout(d time.Duration) Option { + return func(c *Client) { + c.httpClient.Timeout = d + } +} + +// WithDecorrelationMax sets the maximum random delay added to the identity +// request to prevent timing correlation attacks. Default: 5ms. +func WithDecorrelationMax(d time.Duration) Option { + return func(c *Client) { + c.decorrelationMax = d + } +} + +// WithHTTPClient replaces the default HTTP client entirely. +func WithHTTPClient(hc *http.Client) Option { + return func(c *Client) { + c.httpClient = hc + } +} + +// WithRetry enables retry with exponential backoff for transient errors. +func WithRetry(maxAttempts int, baseDelay time.Duration) Option { + return func(c *Client) { + c.maxRetries = maxAttempts + c.retryBaseDelay = baseDelay + } +}