-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathflag_definition_cache.py
More file actions
144 lines (114 loc) · 4.96 KB
/
flag_definition_cache.py
File metadata and controls
144 lines (114 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Flag Definition Cache Provider interface for multi-worker environments.
This module provides an interface for external caching of feature flag definitions,
enabling multi-worker environments (Kubernetes, load-balanced servers, serverless
functions) to share flag definitions and reduce API calls.
Usage:
from posthog import Posthog
from posthog.flag_definition_cache import FlagDefinitionCacheProvider
cache = RedisFlagDefinitionCache(redis_client, "my-team")
posthog = Posthog(
"<project_api_key>",
personal_api_key="<personal_api_key>",
flag_definition_cache_provider=cache,
)
"""
from typing import (
Any,
Awaitable,
Dict,
List,
Optional,
Protocol,
Union,
runtime_checkable,
)
from typing_extensions import Required, TypedDict
class FlagDefinitionCacheData(TypedDict):
"""
Data structure for cached flag definitions.
Attributes:
flags: List of feature flag definition dictionaries from the API.
group_type_mapping: Mapping of group type indices to group names.
cohorts: Dictionary of cohort definitions for local evaluation.
"""
flags: Required[List[Dict[str, Any]]]
group_type_mapping: Required[Dict[str, str]]
cohorts: Required[Dict[str, Any]]
@runtime_checkable
class FlagDefinitionCacheProvider(Protocol):
"""
Interface for external caching of feature flag definitions.
Enables multi-worker environments to share flag definitions, reducing API
calls while ensuring all workers have consistent data.
Methods may be implemented as either synchronous functions or async
functions. If a method returns an awaitable, the SDK runs it to completion
before continuing.
The four methods handle the complete lifecycle of flag definition caching:
1. `should_fetch_flag_definitions()` - Called before each poll to determine
if this worker should fetch new definitions. Use for distributed lock
coordination to ensure only one worker fetches at a time.
2. `get_flag_definitions()` - Called when `should_fetch_flag_definitions()`
returns False. Returns cached definitions if available.
3. `on_flag_definitions_received()` - Called after successfully fetching
new definitions from the API. Store the data in your external cache
and release any locks.
4. `shutdown()` - Called when the PostHog client shuts down. Release any
distributed locks and clean up resources.
Error Handling:
All methods are wrapped in try/except. Errors will be logged but will
never break flag evaluation. On error:
- `should_fetch_flag_definitions()` errors default to fetching (fail-safe)
- `get_flag_definitions()` errors fall back to API fetch
- `on_flag_definitions_received()` errors are logged but flags remain in memory
- `shutdown()` errors are logged but shutdown continues
"""
def get_flag_definitions(
self,
) -> Union[
Optional[FlagDefinitionCacheData], Awaitable[Optional[FlagDefinitionCacheData]]
]:
"""
Retrieve cached flag definitions.
Returns:
Cached flag definitions if available and valid, None otherwise.
May return an awaitable resolving to the same value. Returning None
will trigger a fetch from the API if this worker has no flags loaded
yet.
"""
...
def should_fetch_flag_definitions(self) -> Union[bool, Awaitable[bool]]:
"""
Determine whether this instance should fetch new flag definitions.
Use this for distributed lock coordination. Only one worker should
return True to avoid thundering herd problems. A typical implementation
uses a distributed lock (e.g., Redis SETNX) that expires after the
poll interval.
Returns:
True if this instance should fetch from the API, False otherwise.
May return an awaitable resolving to the same value. When False, the
client will call `get_flag_definitions()` to retrieve cached data
instead.
"""
...
def on_flag_definitions_received(
self, data: FlagDefinitionCacheData
) -> Optional[Awaitable[None]]:
"""
Called after successfully receiving new flag definitions from PostHog.
Use this to store the data in your external cache and release any
distributed locks acquired in `should_fetch_flag_definitions()`.
Args:
data: The flag definitions to cache, containing flags,
group_type_mapping, and cohorts.
"""
...
def shutdown(self) -> Optional[Awaitable[None]]:
"""
Called when the PostHog client shuts down.
Use this to release any distributed locks and clean up resources.
This method is called even if `should_fetch_flag_definitions()`
returned False, so implementations should handle the case where
no lock was acquired.
"""
...