diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index fa65285..b98725e 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -28,8 +28,8 @@ jobs: - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.14.1' # Define the elixir version [required] - otp-version: '25.1.2' # Define the OTP version [required] + version-type: strict + version-file: .tool-versions - name: Restore dependencies cache uses: actions/cache@v3 @@ -49,5 +49,8 @@ jobs: - name: Run Coveralls run: mix coveralls.json - - name: Upload to codecov.io - uses: codecov/codecov-action@v1 + - uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: true + verbose: true \ No newline at end of file diff --git a/.github/workflows/credo.yml b/.github/workflows/credo.yml index c0e59df..fab7765 100644 --- a/.github/workflows/credo.yml +++ b/.github/workflows/credo.yml @@ -15,8 +15,8 @@ jobs: - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.14.1' # Define the elixir version [required] - otp-version: '25.1.2' # Define the OTP version [required] + version-type: strict + version-file: .tool-versions - name: Restore dependencies cache uses: actions/cache@v3 diff --git a/.github/workflows/dialyzer.yml b/.github/workflows/dialyzer.yml index f1f236f..fae52cc 100644 --- a/.github/workflows/dialyzer.yml +++ b/.github/workflows/dialyzer.yml @@ -15,8 +15,8 @@ jobs: - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.14.1' # Define the elixir version [required] - otp-version: '25.1.2' # Define the OTP version [required] + version-type: strict + version-file: .tool-versions - name: Restore dependencies cache uses: actions/cache@v3 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 566e1d4..71bc26e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,8 +28,8 @@ jobs: - name: Set up Elixir uses: erlef/setup-beam@v1 with: - elixir-version: '1.14.1' # Define the elixir version [required] - otp-version: '25.1.2' # Define the OTP version [required] + version-type: strict + version-file: .tool-versions - name: Restore dependencies cache uses: actions/cache@v3 diff --git a/guides/explanation/architecture.md b/guides/explanation/architecture.md index b69e877..bb6d161 100644 --- a/guides/explanation/architecture.md +++ b/guides/explanation/architecture.md @@ -8,9 +8,10 @@ ElixirCache is designed around a simple principle: provide a consistent interfac 1. **Core Interface**: Defined by the `Cache` module 2. **Adapters**: Backend-specific implementations -3. **Term Encoder**: Handles serialization and compression -4. **Telemetry Integration**: For observability and metrics -5. **Sandbox System**: For isolated testing +3. **Strategy Adapters**: Higher-level patterns that compose over adapters +4. **Term Encoder**: Handles serialization and compression +5. **Telemetry Integration**: For observability and metrics +6. **Sandbox System**: For isolated testing ## The Cache Behaviour @@ -82,6 +83,31 @@ A simple implementation using Elixir's Agent for lightweight in-memory storage. Wraps the ConCache library to provide its expiration and callback capabilities. +## Strategy Adapters + +Strategy adapters implement the `Cache.Strategy` behaviour and compose over +regular adapters to provide higher-level caching patterns. They are specified +using a tuple format: `adapter: {StrategyModule, UnderlyingAdapterOrConfig}`. + +### Cache.HashRing + +Distributes cache keys across Erlang cluster nodes using a consistent hash ring +powered by `libring`. Operations are forwarded to the owning node via +`:erpc.call/4`. The ring monitors node membership automatically. + +### Cache.MultiLayer + +Chains multiple cache modules together. Reads cascade fastest → slowest with +automatic backfill on slower-layer hits. Writes go slowest → fastest to ensure +durability before populating fast layers. + +### Cache.RefreshAhead + +Proactively refreshes values in the background before they expire. When a `get` +detects a value is within the refresh window, it returns the current value +immediately and spawns an async task to fetch a fresh one. Uses per-node ETS +deduplication and cross-node `:global` locking to prevent redundant refreshes. + ## Telemetry Integration ElixirCache provides telemetry events for all cache operations: diff --git a/guides/how-to/choosing_adapter.md b/guides/how-to/choosing_adapter.md index 903b4e9..62a2fa7 100644 --- a/guides/how-to/choosing_adapter.md +++ b/guides/how-to/choosing_adapter.md @@ -51,7 +51,7 @@ defmodule MyApp.PersistentCache do adapter: Cache.DETS, name: :my_app_persistent_cache, opts: [ - file: "cache_data.dets" + file_path: "/tmp/cache_data" ] end ``` @@ -71,9 +71,9 @@ defmodule MyApp.DistributedCache do adapter: Cache.Redis, name: :my_app_redis_cache, opts: [ - host: "localhost", - port: 6379, - pool_size: 5 + uri: "redis://localhost:6379", + size: 10, + max_overflow: 5 ] end ``` @@ -143,24 +143,12 @@ defmodule MyApp.Cache do name: :my_app_cache, opts: get_opts() - defp get_adapter do - case Mix.env() do - :test -> Cache.Sandbox - :dev -> Cache.ETS - :prod -> Cache.Redis - end - end - - defp get_opts do - case Mix.env() do - :test -> [] - :dev -> [read_concurrency: true] - :prod -> [ - host: System.get_env("REDIS_HOST", "localhost"), - port: String.to_integer(System.get_env("REDIS_PORT", "6379")), - pool_size: 10 - ] - end + if Mix.env() === :test do + defp get_adapter, do: Cache.ETS + defp get_opts, do: [] + else + defp get_adapter, do: Cache.Redis + defp get_opts, do: [uri: "redis://localhost:6379", size: 10] end end ``` diff --git a/guides/how-to/redis_setup.md b/guides/how-to/redis_setup.md index 672b47d..5070aa4 100644 --- a/guides/how-to/redis_setup.md +++ b/guides/how-to/redis_setup.md @@ -21,9 +21,9 @@ defmodule MyApp.RedisCache do adapter: Cache.Redis, name: :my_app_redis, opts: [ - host: "localhost", - port: 6379, - pool_size: 5 + uri: "redis://localhost:6379", + size: 10, + max_overflow: 5 ] end ``` @@ -46,7 +46,16 @@ end ## Redis Configuration Options -The Redis adapter supports various configuration options: +The Redis adapter accepts the following options: + +| Option | Type | Required | Description | +|---|---|---|---| +| `uri` | `string` | Yes | Redis connection URI (e.g. `"redis://localhost:6379"`, `"redis://:password@host:6379/0"`) | +| `size` | `pos_integer` | No | Number of workers in the connection pool (default: 50) | +| `max_overflow` | `pos_integer` | No | Maximum overflow workers the pool can create (default: 20) | +| `strategy` | `:fifo` or `:lifo` | No | Queue strategy for the Poolboy connection pool | + +Authentication, database selection, and SSL are configured via the URI string: ```elixir defmodule MyApp.RedisCache do @@ -54,30 +63,9 @@ defmodule MyApp.RedisCache do adapter: Cache.Redis, name: :my_app_redis, opts: [ - # Connection settings - host: "redis.example.com", - port: 6379, - password: "your_password", # Optional - database: 0, # Optional, default is 0 - - # Connection pool settings - pool_size: 10, # Number of connections in the pool - max_overflow: 5, # Maximum number of overflow workers - - # Timeout settings - timeout: 5000, # Connection timeout in milliseconds - - # SSL options - ssl: true, # Enable SSL - ssl_opts: [ # SSL options - verify: :verify_peer, - cacertfile: "/path/to/ca_certificate.pem", - certfile: "/path/to/client_certificate.pem", - keyfile: "/path/to/client_key.pem" - ], - - # Encoding options - compression_level: 1 # Level of compression (0-9, higher = more compression) + uri: "redis://:my_password@redis.example.com:6379/2", + size: 10, + max_overflow: 5 ] end ``` @@ -92,10 +80,9 @@ defmodule MyApp.RedisCache do adapter: Cache.Redis, name: :my_app_redis, opts: [ - host: System.get_env("REDIS_HOST", "localhost"), - port: String.to_integer(System.get_env("REDIS_PORT", "6379")), - password: System.get_env("REDIS_PASSWORD"), - pool_size: String.to_integer(System.get_env("REDIS_POOL_SIZE", "10")) + uri: System.get_env("REDIS_URL", "redis://localhost:6379"), + size: String.to_integer(System.get_env("REDIS_POOL_SIZE", "10")), + max_overflow: 5 ] end ``` diff --git a/guides/how-to/testing_with_cache.md b/guides/how-to/testing_with_cache.md index 6bfbeea..c0f1fc3 100644 --- a/guides/how-to/testing_with_cache.md +++ b/guides/how-to/testing_with_cache.md @@ -4,105 +4,71 @@ This guide explains how to effectively test applications that use ElixirCache, f ## Using the Sandbox Mode -ElixirCache provides a sandbox mode specifically designed for testing. This ensures that your tests: +ElixirCache provides a sandbox mode that gives each test its own isolated cache +namespace. This ensures your tests: 1. Are isolated from each other 2. Don't leave lingering cache data between test runs 3. Can run in parallel without conflicts -### Configuring Your Cache for Testing +### Configuring Your Cache -In your test environment, you can wrap any cache adapter with the sandbox functionality: +Use `sandbox?: Mix.env() === :test` on your cache module. The adapter stays the +same in every environment — the sandbox wraps whatever adapter you choose: ```elixir -# In lib/my_app/cache.ex defmodule MyApp.Cache do use Cache, - adapter: get_cache_adapter(), + adapter: Cache.Redis, name: :my_app_cache, - opts: get_cache_opts(), - # Enable sandbox mode in test environment - sandbox?: Mix.env() == :test - - defp get_cache_adapter do - case Mix.env() do - :test -> Cache.ETS - :dev -> Cache.ETS - :prod -> Cache.Redis - end - end - - defp get_cache_opts do - case Mix.env() do - :test -> [] - :dev -> [] - :prod -> [host: "redis.example.com", port: 6379] - end - end + opts: [uri: "redis://localhost:6379"], + sandbox?: Mix.env() === :test end ``` +When `sandbox?` is `true`, `Cache.Sandbox` is used as the adapter automatically. +You do not need to switch adapters between environments. + ### Setting Up the Sandbox Registry -To use the sandbox functionality in your tests, you need to start the `Cache.SandboxRegistry` in your test setup: +Add `Cache.SandboxRegistry.start_link()` to your `test/test_helper.exs`: ```elixir # In test/test_helper.exs +Cache.SandboxRegistry.start_link() ExUnit.start() - -# Start the sandbox registry for your tests -{:ok, _pid} = Cache.SandboxRegistry.start_link() - -# Start your application's supervision tree -Application.ensure_all_started(:my_app) ``` ### Using the Sandbox in Tests -Using the sandbox in your tests is very simple. All you need to do is start the sandbox registry in your setup block: +Register your cache in each test's `setup` block with +`Cache.SandboxRegistry.start/1`. This starts the cache supervisor and +registers the current test process for isolation: ```elixir defmodule MyApp.CacheTest do use ExUnit.Case, async: true - defmodule TestCache do - use Cache, - adapter: Cache.Redis, # The actual adapter doesn't matter in sandbox mode - name: :test_cache, - opts: [], - sandbox?: Mix.env() === :test - end - setup do - # This single line is all you need to set up sandbox isolation - Cache.SandboxRegistry.start(TestCache) + Cache.SandboxRegistry.start(MyApp.Cache) :ok end - - test "can store and retrieve values" do - assert :ok = TestCache.put("test-key", "test-value") - assert {:ok, "test-value"} = TestCache.get("test-key") - end - - test "can handle complex data structures" do - data = %{users: [%{name: "Alice"}, %{name: "Bob"}]} - assert :ok = TestCache.put("complex-key", data) - assert {:ok, ^data} = TestCache.get("complex-key") + + test "stores and retrieves values" do + assert :ok === MyApp.Cache.put("key", "value") + assert {:ok, "value"} === MyApp.Cache.get("key") end - test "provides isolation between tests" do - # This will return nil because each test has an isolated cache - assert {:ok, nil} = TestCache.get("test-key") + test "each test is isolated" do + assert {:ok, nil} === MyApp.Cache.get("key") end end ``` - ## Tips for Testing with ElixirCache -1. **Always use the sandbox in tests**: This prevents interference between tests. -2. **Clean up after each test**: Use `on_exit` to unregister from the sandbox. -3. **Use unique keys**: Even with sandboxing, using descriptive, unique keys makes debugging easier. -4. **Test edge cases**: Including cache misses, errors, and TTL expiration. -5. **Consider using fixtures**: For commonly cached data structures. -6. **Verify telemetry events**: If your application relies on cache metrics. +1. **Always use `sandbox?: Mix.env() === :test`**: Keep the same adapter everywhere — the sandbox handles isolation. +2. **Use `Cache.SandboxRegistry.start/1` in setup**: This is the only line needed per test. +3. **Tests can be `async: true`**: Each test gets its own sandbox namespace. +4. **Test edge cases**: Cache misses, errors, and TTL expiration. +5. **Verify telemetry events**: If your application relies on cache metrics. diff --git a/guides/how-to/using_strategies.md b/guides/how-to/using_strategies.md new file mode 100644 index 0000000..a6988e0 --- /dev/null +++ b/guides/how-to/using_strategies.md @@ -0,0 +1,241 @@ +# Using Strategy Adapters + +Strategy adapters compose over existing cache adapters to provide higher-level +caching patterns. They are specified using a two-element tuple as the `adapter` +option: + +```elixir +use Cache, + adapter: {StrategyModule, UnderlyingAdapterOrConfig}, + name: :my_cache, + opts: [strategy_opt: value, underlying_adapter_opt: value] +``` + +The `opts` keyword list is shared between the strategy and the underlying +adapter. Strategy-specific keys are validated and consumed by the strategy; +any remaining keys are passed through to the underlying adapter. + +## Available Strategies + +| Strategy | Use Case | +|---|---| +| `Cache.HashRing` | Distribute keys across Erlang cluster nodes via consistent hashing | +| `Cache.MultiLayer` | Cascade reads/writes through multiple cache layers (e.g. ETS → Redis) | +| `Cache.RefreshAhead` | Proactively refresh hot keys in the background before they expire | + +--- + +## Cache.HashRing + +Distributes cache keys across Erlang cluster nodes using a consistent hash ring +powered by [`libring`](https://hex.pm/packages/libring). Each key always hashes +to the same node (given the same ring), so no cross-node coordination is needed +for reads — the operation is simply forwarded to the owning node via a +configurable RPC module (defaults to `:erpc`). + +### Usage + +```elixir +defmodule MyApp.DistributedCache do + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :distributed_cache, + opts: [read_concurrency: true] +end +``` + +Start it in your supervision tree: + +```elixir +children = [MyApp.DistributedCache] +Supervisor.start_link(children, strategy: :one_for_one) +``` + +### How It Works + +1. Every node starts the same underlying adapter locally (e.g. an ETS table). +2. On `get`/`put`/`delete`, the key is hashed against the managed ring to pick + the owning node. +3. If the owning node is `Node.self()`, the operation runs locally. +4. If it is a remote node, the operation is forwarded via the configured + `rpc_module` (default `:erpc`). + +The ring monitors node membership automatically (`monitor_nodes: true`), so +nodes joining or leaving the cluster are reflected without manual intervention. + +### Options + +| Option | Type | Default | Description | +|---|---|---|---| +| `ring_opts` | `keyword` | `[]` | Options passed to `HashRing.Worker`, such as `node_blacklist` and `node_whitelist`. | +| `node_weight` | `pos_integer` | `128` | Number of virtual nodes (shards) per node on the ring. Higher values give more even key distribution. | +| `rpc_module` | `atom` | `:erpc` | Module used for remote calls. Must implement `call/4` with the same signature as `:erpc.call/4`. | +| `ring_history_size` | `pos_integer` | `3` | Number of previous ring snapshots to retain for read-repair fallback. | + +### Read-Repair + +When a node joins or leaves the cluster, some keys will hash to a different +node. Rather than doing a full key migration scan, `Cache.HashRing` uses +**read-repair** to lazily migrate keys on demand: + +1. A `get` call misses on the current owning node. +2. Previous ring snapshots are consulted in order (newest first), maintained + by the `Cache.HashRing.RingMonitor` GenServer. +3. For each previous ring, if the key hashed to a different reachable node, + a `get` is attempted there. +4. On a hit, the value is written to the current owner and the old node is + deleted asynchronously. + +Nodes that are unreachable are skipped automatically, and each node is only +tried once even if it appears in multiple historical ring snapshots. + +To control how many previous rings are retained: + +```elixir +opts: [ring_history_size: 5] +``` + +### Custom RPC Module + +To use a different RPC library (e.g. for timeout control or tracing): + +```elixir +opts: [rpc_module: MyApp.CustomRPC] +``` + +The module must export `call(node, module, function, args)`. + +--- + +## Cache.MultiLayer + +Chains multiple cache modules together. Reads walk the list fastest → slowest; +on a hit the value is backfilled into all faster layers. Writes go slowest → +fastest to ensure durability before populating fast layers. + +### Usage + +```elixir +defmodule MyApp.LayeredCache do + use Cache, + adapter: {Cache.MultiLayer, [MyApp.LocalCache, MyApp.RedisCache]}, + name: :layered_cache, + opts: [backfill_ttl: :timer.minutes(5)] +end +``` + +Each element in the layers list must be a module that exposes `get/1`, `put/3`, +and `delete/1` — i.e. a module defined with `use Cache`. + +### Fetch Callback on Total Miss + +If all layers miss, an optional `on_fetch` callback can supply the value and +backfill all layers: + +```elixir +defmodule MyApp.LayeredCache do + use Cache, + adapter: {Cache.MultiLayer, [MyApp.LocalCache, MyApp.RedisCache]}, + name: :layered_cache, + opts: [on_fetch: &__MODULE__.fetch/1] + + def fetch(key) do + {:ok, MyApp.Repo.get_value(key)} + end +end +``` + +### Options + +| Option | Type | Description | +|---|---|---| +| `on_fetch` | `fun/1` or MFA | Called on total miss. Receives `key`, returns `{:ok, value}` or `{:error, reason}`. | +| `backfill_ttl` | `pos_integer \| nil` | TTL used when backfilling layers on a slower-layer hit. Defaults to `nil` (no expiry). | + +--- + +## Cache.RefreshAhead + +Proactively refreshes values in the background before they expire. When a `get` +detects that a cached value is within the `refresh_before` window, it returns +the current (still-valid) value immediately and spawns an async `Task` to fetch +a fresh one. Only actively-read keys are refreshed — unread keys naturally expire. + +### Usage + +Define a `refresh/1` callback on your cache module: + +```elixir +defmodule MyApp.Cache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.Redis}, + name: :my_cache, + opts: [ + uri: "redis://localhost:6379", + refresh_before: :timer.seconds(30) + ] + + def refresh(key) do + {:ok, MyApp.fetch_value(key)} + end +end +``` + +### Inline Refresh Callback + +You can supply the callback directly via `on_refresh` instead of defining +`refresh/1`: + +```elixir +opts: [ + refresh_before: :timer.seconds(30), + on_refresh: fn key -> {:ok, MyApp.fetch_value(key)} end +] +``` + +### How the Refresh Window Works + +Given a value stored with `ttl = 60_000` ms and `refresh_before = 10_000` ms: + +- For the first 50 seconds, `get` returns the value with no background work. +- After 50 seconds, `get` returns the value **and** spawns a refresh task. +- The refresh task calls `refresh/1`, then writes the new value with the same TTL. +- A per-cache ETS deduplication table ensures only one refresh runs per key at + a time. +- A `:global` lock prevents the same key from being refreshed on multiple nodes + simultaneously when running in a cluster. + +### Options + +| Option | Type | Required | Description | +|---|---|---|---| +| `refresh_before` | `pos_integer` | Yes | Milliseconds before TTL expiry to trigger background refresh. | +| `on_refresh` | `fun/1` or MFA | No | Refresh callback. Falls back to `YourCacheModule.refresh/1`. | +| `lock_node_whitelist` | `atom` or `[atom]` | No | Node whitelist for distributed refresh locks. Defaults to all connected nodes. | + +--- + +## Testing Strategies + +Strategies respect the `sandbox?: true` option. When sandboxed, the strategy +layer is bypassed entirely and the `Cache.Sandbox` adapter is used directly, +giving you the same per-test isolation as regular adapters: + +```elixir +defmodule MyApp.Cache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.Redis}, + name: :my_cache, + sandbox?: true, + opts: [refresh_before: :timer.seconds(30)] +end +``` + +In your test setup: + +```elixir +setup do + Cache.SandboxRegistry.start(MyApp.Cache) + :ok +end +``` diff --git a/guides/reference/api.md b/guides/reference/api.md index 0a8e05a..8112a78 100644 --- a/guides/reference/api.md +++ b/guides/reference/api.md @@ -6,7 +6,7 @@ This document provides a comprehensive overview of the ElixirCache API and all a These functions are available on any cache module that uses the `Cache` behavior. -### put/3 +### put/2 and put/3 ```elixir put(key, value) @@ -16,7 +16,7 @@ put(key, ttl, value) Stores a value in the cache. - `key` - The key under which to store the value (atom or string) -- `ttl` - Optional time-to-live in seconds (integer) +- `ttl` - Optional time-to-live in milliseconds (e.g. `:timer.seconds(60)`). Defaults to `nil` (no expiry). - `value` - The value to store (any Elixir term) Returns `:ok` or `{:error, reason}`. diff --git a/guides/tutorials/advanced_techniques.md b/guides/tutorials/advanced_techniques.md index 27d071d..4873f7f 100644 --- a/guides/tutorials/advanced_techniques.md +++ b/guides/tutorials/advanced_techniques.md @@ -52,39 +52,41 @@ end ## Implementing Cache Stampede Protection -Cache stampede occurs when many requests try to regenerate a cached item simultaneously: +Cache stampede occurs when many requests try to regenerate a cached item simultaneously. + +> **Tip**: ElixirCache ships with `Cache.RefreshAhead`, a strategy adapter that +> handles this automatically with background refresh and per-key deduplication. +> See the [Using Strategies](../how-to/using_strategies.md) guide. + +If you need a manual approach without the strategy adapter: ```elixir defmodule MyApp.StampedeProtection do - # Stale-while-revalidate pattern def get_with_protection(key, ttl, stale_ttl, generator_fn) do case MyApp.Cache.get(key) do - # Cache hit - return value {:ok, %{value: value, timestamp: ts}} -> - now = System.system_time(:second) - - # If stale but not expired, return stale value and trigger background refresh + now = System.system_time(:millisecond) + if now - ts > stale_ttl and now - ts < ttl do - Task.start(fn -> - refresh_cache(key, ttl, generator_fn) + Task.start(fn -> + refresh_cache(key, ttl, generator_fn) end) end - + {:ok, value} - - # Cache miss - generate and store + _ -> refresh_cache(key, ttl, generator_fn) end end - + defp refresh_cache(key, ttl, generator_fn) do with {:ok, value} <- generator_fn.() do cached_value = %{ value: value, timestamp: System.system_time(:millisecond) } - + MyApp.Cache.put(key, ttl, cached_value) {:ok, value} end @@ -114,7 +116,7 @@ defmodule MyApp.UserCache do # Bulk invalidation in ETS cache def invalidate_all_users() do - if MyApp.Cache.cache_adapter() == Cache.ETS do + if MyApp.Cache.cache_adapter() === Cache.ETS do MyApp.Cache.match_delete({:"#{@prefix}_", :_}) else # Fallback for other adapters without pattern matching diff --git a/guides/tutorials/basic_operations.md b/guides/tutorials/basic_operations.md index f4518da..9c01cb1 100644 --- a/guides/tutorials/basic_operations.md +++ b/guides/tutorials/basic_operations.md @@ -24,8 +24,8 @@ To store a value in the cache: # Basic storage with no TTL (Time To Live) MyApp.Cache.put("user:1", %{name: "John", role: "admin"}) -# With a TTL of 60 seconds -MyApp.Cache.put("session:123", %{user_id: 1, authenticated: true}, 60) +# With a TTL of 60 seconds (TTL is in milliseconds) +MyApp.Cache.put("session:123", :timer.seconds(60), %{user_id: 1, authenticated: true}) ``` ### Retrieving Values diff --git a/guides/tutorials/installation.md b/guides/tutorials/installation.md index 157344a..ca9d64d 100644 --- a/guides/tutorials/installation.md +++ b/guides/tutorials/installation.md @@ -9,7 +9,7 @@ Add `:elixir_cache` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:elixir_cache, "~> 0.3.8"} + {:elixir_cache, "~> 0.4"} ] end ``` diff --git a/lib/cache.ex b/lib/cache.ex index 1bbe099..1962dec 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -34,187 +34,358 @@ defmodule Cache do @callback delete(cache_name :: atom, key :: atom | String.t()) :: :ok | ErrorMessage.t() defmacro __using__(opts) do - quote do - opts = unquote(opts) + strategy? = is_tuple(Macro.expand(opts[:adapter], __CALLER__)) - @cache_opts opts - @cache_name opts[:name] - @cache_adapter if opts[:sandbox?], do: Cache.Sandbox, else: opts[:adapter] + if strategy? do + quote do + @cache_opts unquote(opts) + @cache_name unquote(opts[:name]) + {strategy_module, strategy_config} = unquote(opts[:adapter]) + @cache_adapter strategy_module + @cache_strategy_module strategy_module + @cache_strategy_config strategy_config - if !opts[:adapter] do - raise "Must supply a cache adapter for #{__MODULE__}" - end + pre_check_runtime_options = fn + {_, _, _} = mfa -> + mfa - if !@cache_name do - raise "Must supply a cache name for #{__MODULE__}" - end + {_, _} = app_config -> + app_config - pre_check_runtime_options = fn - {_, _, _} = mfa -> - mfa + fun when is_function(fun, 0) -> + fun - {_, _} = app_config -> - app_config + app_name when is_atom(app_name) and not is_nil(app_name) -> + app_name - fun when is_function(fun, 0) -> - fun + val -> + raise ArgumentError, """ + Bad option in adapter module #{inspect(__MODULE__)}! - app_name when is_atom(app_name) and not is_nil(app_name) -> - app_name + Expected one of the following: - val -> - raise ArgumentError, """ - Bad option in adapter module #{inspect(__MODULE__)}! + * `{module, function, args}` - Module, function, args + * `{application_name, key}` - Application name. This is called as `Application.fetch_env!(application_name, key)`. + * `application_name` - Application name as an atom. This is called as `Application.fetch_env!(application_name, #{inspect(__MODULE__)})`. + * `function` - Zero arity callback function. For eg. `&YourModule.options/0` + * `[key: value_type]` - Keyword list of options. - Expected one of the following: + Got: #{inspect(val)} + """ + end - * `{module, function, args}` - Module, function, args - * `{application_name, key}` - Application name. This is called as `Application.fetch_env!(application_name, key)`. - * `application_name` - Application name as an atom. This is called as `Application.fetch_env!(application_name, #{inspect(__MODULE__)})`. - * `function` - Zero arity callback function. For eg. `&YourModule.options/0` - * `[key: value_type]` - Keyword list of options. + check_adapter_opts = fn + adapter_opts when is_list(adapter_opts) -> + strategy_keys = Keyword.keys(@cache_strategy_module.opts_definition()) + strategy_opts = Keyword.take(adapter_opts, strategy_keys) + NimbleOptions.validate!(strategy_opts, @cache_strategy_module.opts_definition()) + adapter_opts - Got: #{inspect(val)} - """ - end + adapter_opts -> + pre_check_runtime_options.(adapter_opts) + end - check_adapter_opts = fn - adapter_opts when is_list(adapter_opts) -> - NimbleOptions.validate!(adapter_opts, @cache_adapter.opts_definition()) + adapter_opts = check_adapter_opts.(unquote(opts[:opts])) - adapter_opts -> - pre_check_runtime_options.(adapter_opts) - end + @adapter_opts adapter_opts + @compression_level if is_list(@adapter_opts), do: @adapter_opts[:compression_level] - adapter_opts = if opts[:sandbox?], do: [], else: check_adapter_opts.(opts[:opts]) + def cache_name, do: @cache_name + def cache_adapter, do: @cache_adapter - @adapter_opts adapter_opts - @compression_level if is_list(@adapter_opts), do: @adapter_opts[:compression_level] + def adapter_options do + opts = adapter_options!(@adapter_opts) + if is_list(opts), do: Keyword.put_new(opts, :__cache_module__, __MODULE__), else: opts + end - if macro_exported?(unquote(opts[:adapter]), :__using__, 1) do - use unquote(opts[:adapter]) - end + if match?({_, _, _}, @adapter_opts) do + defp adapter_options!({module, fun, args}), do: apply(module, fun, args) + end - def cache_name, do: @cache_name - def cache_adapter, do: @cache_adapter + if match?({_, _}, @adapter_opts) do + defp adapter_options!({app, key}), do: Application.fetch_env!(app, key) + end - def adapter_options, do: adapter_options!(@adapter_opts) + if is_atom(@adapter_opts) and not is_nil(@adapter_opts) and not is_list(@adapter_opts) do + defp adapter_options!(app_name) when is_atom(app_name), + do: Application.fetch_env!(app_name, __MODULE__) + end - # Generate only the needed adapter_options!/1 clauses based on the actual adapter_opts - if match?({_, _, _}, @adapter_opts) do - defp adapter_options!({module, fun, args}), do: apply(module, fun, args) - end + if is_function(@adapter_opts, 0) do + defp adapter_options!(fun) when is_function(fun, 0), do: fun.() + end - if match?({_, _}, @adapter_opts) do - defp adapter_options!({app, key}), do: Application.fetch_env!(app, key) - end + defp adapter_options!(options), do: options - if is_atom(@adapter_opts) and not is_nil(@adapter_opts) and not is_list(@adapter_opts) do - defp adapter_options!(app_name) when is_atom(app_name), - do: Application.fetch_env!(app_name, __MODULE__) - end + defp handle_adapter_result(result, operation, cache_name) do + with {:error, error} <- result do + :telemetry.execute( + [:elixir_cache, :cache, operation, :error], + %{count: 1}, + %{ + cache_name: cache_name, + error: error + } + ) - if is_function(@adapter_opts, 0) do - defp adapter_options!(fun) when is_function(fun, 0), do: fun.() - end + result + end + end + + def child_spec(_) do + @cache_strategy_module.child_spec({@cache_name, @cache_strategy_config, adapter_options()}) + end + + def put(key, ttl \\ nil, value) do + key = maybe_sandbox_key(key) - defp adapter_options!(options), do: options - - # Dynamic error handler that compiler can't statically analyze - defp handle_adapter_result(result, operation, cache_name) do - with {:error, error} <- result do - :telemetry.execute( - [:elixir_cache, :cache, operation, :error], - %{count: 1}, - %{ - cache_name: cache_name, - error: error - } + :telemetry.span( + [:elixir_cache, :cache, :put], + %{cache_name: @cache_name}, + fn -> + @cache_name + |> @cache_strategy_module.put(key, ttl, value, @cache_strategy_config, adapter_options()) + |> handle_adapter_result(:put, @cache_name) + |> then(&{&1, %{cache_name: @cache_name}}) + end ) + end - result + def get(key) do + key = maybe_sandbox_key(key) + + :telemetry.span( + [:elixir_cache, :cache, :get], + %{cache_name: @cache_name}, + fn -> + result = + @cache_name + |> @cache_strategy_module.get(key, @cache_strategy_config, adapter_options()) + |> handle_adapter_result(:get, @cache_name) + |> case do + {:ok, nil} = res -> + :telemetry.execute([:elixir_cache, :cache, :get, :miss], %{count: 1}, %{ + cache_name: @cache_name + }) + + res + + {:ok, _} = res -> + res + + {:error, _} = error -> + error + end + + {result, %{cache_name: @cache_name}} + end + ) end - end - def child_spec(_) do - @cache_adapter.child_spec({@cache_name, adapter_options()}) - end + def delete(key) do + key = maybe_sandbox_key(key) - def put(key, ttl \\ nil, value) do - value = Cache.TermEncoder.encode(value, @compression_level) - key = maybe_sandbox_key(key) - - :telemetry.span( - [:elixir_cache, :cache, :put], - %{cache_name: @cache_name}, - fn -> - @cache_name - |> @cache_adapter.put(key, ttl, value, adapter_options()) - |> handle_adapter_result(:put, @cache_name) - |> then(&{&1, %{cache_name: @cache_name}}) - end - ) + :telemetry.span( + [:elixir_cache, :cache, :delete], + %{cache_name: @cache_name}, + fn -> + @cache_name + |> @cache_strategy_module.delete(key, @cache_strategy_config, adapter_options()) + |> handle_adapter_result(:delete, @cache_name) + |> then(&{&1, %{cache_name: @cache_name}}) + end + ) + end + + def get_or_create(key, fnc) do + Cache.get_or_create(__MODULE__, key, fnc) + end + + defp maybe_sandbox_key(key), do: key end + else + quote do + opts = unquote(opts) - def get(key) do - key = maybe_sandbox_key(key) + @cache_opts opts + @cache_name opts[:name] + @cache_adapter if opts[:sandbox?], do: Cache.Sandbox, else: opts[:adapter] - :telemetry.span( - [:elixir_cache, :cache, :get], - %{cache_name: @cache_name}, - fn -> - result = - @cache_name - |> @cache_adapter.get(key, adapter_options()) - |> handle_adapter_result(:get, @cache_name) - |> case do - {:ok, nil} = res -> - :telemetry.execute([:elixir_cache, :cache, :get, :miss], %{count: 1}, %{ - cache_name: @cache_name - }) + if !opts[:adapter] do + raise "Must supply a cache adapter for #{__MODULE__}" + end + + if !@cache_name do + raise "Must supply a cache name for #{__MODULE__}" + end - res + pre_check_runtime_options = fn + {_, _, _} = mfa -> + mfa - {:ok, value} -> - {:ok, Cache.TermEncoder.decode(value)} + {_, _} = app_config -> + app_config - {:error, _} = error -> - error - end + fun when is_function(fun, 0) -> + fun - {result, %{cache_name: @cache_name}} - end - ) - end + app_name when is_atom(app_name) and not is_nil(app_name) -> + app_name + + val -> + raise ArgumentError, """ + Bad option in adapter module #{inspect(__MODULE__)}! + + Expected one of the following: - def delete(key) do - key = maybe_sandbox_key(key) - - :telemetry.span( - [:elixir_cache, :cache, :delete], - %{cache_name: @cache_name}, - fn -> - @cache_name - |> @cache_adapter.delete(key, adapter_options()) - |> handle_adapter_result(:delete, @cache_name) - |> then(&{&1, %{cache_name: @cache_name}}) + * `{module, function, args}` - Module, function, args + * `{application_name, key}` - Application name. This is called as `Application.fetch_env!(application_name, key)`. + * `application_name` - Application name as an atom. This is called as `Application.fetch_env!(application_name, #{inspect(__MODULE__)})`. + * `function` - Zero arity callback function. For eg. `&YourModule.options/0` + * `[key: value_type]` - Keyword list of options. + + Got: #{inspect(val)} + """ + end + + check_adapter_opts = fn + adapter_opts when is_list(adapter_opts) -> + NimbleOptions.validate!(adapter_opts, @cache_adapter.opts_definition()) + + adapter_opts -> + pre_check_runtime_options.(adapter_opts) + end + + adapter_opts = if opts[:sandbox?], do: [], else: check_adapter_opts.(opts[:opts]) + + @adapter_opts adapter_opts + @compression_level if is_list(@adapter_opts), do: @adapter_opts[:compression_level] + + if macro_exported?(unquote(opts[:adapter]), :__using__, 1) do + use unquote(opts[:adapter]) + end + + def cache_name, do: @cache_name + def cache_adapter, do: @cache_adapter + + def adapter_options, do: adapter_options!(@adapter_opts) + + # Generate only the needed adapter_options!/1 clauses based on the actual adapter_opts + if match?({_, _, _}, @adapter_opts) do + defp adapter_options!({module, fun, args}), do: apply(module, fun, args) + end + + if match?({_, _}, @adapter_opts) do + defp adapter_options!({app, key}), do: Application.fetch_env!(app, key) + end + + if is_atom(@adapter_opts) and not is_nil(@adapter_opts) and not is_list(@adapter_opts) do + defp adapter_options!(app_name) when is_atom(app_name), + do: Application.fetch_env!(app_name, __MODULE__) + end + + if is_function(@adapter_opts, 0) do + defp adapter_options!(fun) when is_function(fun, 0), do: fun.() + end + + defp adapter_options!(options), do: options + + # Dynamic error handler that compiler can't statically analyze + defp handle_adapter_result(result, operation, cache_name) do + with {:error, error} <- result do + :telemetry.execute( + [:elixir_cache, :cache, operation, :error], + %{count: 1}, + %{ + cache_name: cache_name, + error: error + } + ) + + result end - ) - end + end - def get_or_create(key, fnc) do - Cache.get_or_create(__MODULE__, key, fnc) - end + def child_spec(_) do + @cache_adapter.child_spec({@cache_name, adapter_options()}) + end + + def put(key, ttl \\ nil, value) do + value = Cache.TermEncoder.encode(value, @compression_level) + key = maybe_sandbox_key(key) + + :telemetry.span( + [:elixir_cache, :cache, :put], + %{cache_name: @cache_name}, + fn -> + @cache_name + |> @cache_adapter.put(key, ttl, value, adapter_options()) + |> handle_adapter_result(:put, @cache_name) + |> then(&{&1, %{cache_name: @cache_name}}) + end + ) + end + + def get(key) do + key = maybe_sandbox_key(key) + + :telemetry.span( + [:elixir_cache, :cache, :get], + %{cache_name: @cache_name}, + fn -> + result = + @cache_name + |> @cache_adapter.get(key, adapter_options()) + |> handle_adapter_result(:get, @cache_name) + |> case do + {:ok, nil} = res -> + :telemetry.execute([:elixir_cache, :cache, :get, :miss], %{count: 1}, %{ + cache_name: @cache_name + }) + + res + + {:ok, value} -> + {:ok, Cache.TermEncoder.decode(value)} + + {:error, _} = error -> + error + end + + {result, %{cache_name: @cache_name}} + end + ) + end + + def delete(key) do + key = maybe_sandbox_key(key) - if @cache_opts[:sandbox?] do - defp maybe_sandbox_key(key) do - sandbox_id = Cache.SandboxRegistry.find!(__MODULE__) + :telemetry.span( + [:elixir_cache, :cache, :delete], + %{cache_name: @cache_name}, + fn -> + @cache_name + |> @cache_adapter.delete(key, adapter_options()) + |> handle_adapter_result(:delete, @cache_name) + |> then(&{&1, %{cache_name: @cache_name}}) + end + ) + end - "#{sandbox_id}:#{key}" + def get_or_create(key, fnc) do + Cache.get_or_create(__MODULE__, key, fnc) end - else - defp maybe_sandbox_key(key) do - key + + if @cache_opts[:sandbox?] do + defp maybe_sandbox_key(key) do + sandbox_id = Cache.SandboxRegistry.find!(__MODULE__) + + "#{sandbox_id}:#{key}" + end + else + defp maybe_sandbox_key(key) do + key + end end end end diff --git a/lib/cache/counter.ex b/lib/cache/counter.ex new file mode 100644 index 0000000..8301af6 --- /dev/null +++ b/lib/cache/counter.ex @@ -0,0 +1,206 @@ +defmodule Cache.Counter do + @opts_definition [ + initial_size: [ + type: :pos_integer, + default: 16, + doc: "Initial number of counter slots to pre-allocate" + ] + ] + + @moduledoc """ + Atomic integer counter adapter backed by Erlang's `:counters` module. + + Counter values are stored in a lock-free `:counters` array. The array reference + and the key-to-index mapping are stored in `:persistent_term` so all processes + can access them without a process round-trip. + + ## Behaviour + + - `put/4` accepts only `1` or `-1` as values, acting as increment or decrement. + Any other value returns an error. + - `get/2` returns the current integer value for a key, or `nil` if the key is unknown. + - `delete/2` removes a key from the index map and zeroes its counter slot. + - `increment/1,2` and `decrement/1,2` are injected into consumer modules via `use`. + + ## Options + #{NimbleOptions.docs(@opts_definition)} + + ## Example + + ```elixir + defmodule MyApp.Cache do + use Cache, + adapter: Cache.Counter, + name: :my_app_counter_cache, + opts: [initial_size: 32] + end + + MyApp.Cache.increment(:page_views) + MyApp.Cache.decrement(:active_users) + {:ok, count} = MyApp.Cache.get(:page_views) + ``` + """ + + use Task, restart: :permanent + + @behaviour Cache + + @ref_key :__counter_ref__ + @index_map_key :__counter_index_map__ + + defmacro __using__(_opts) do + quote do + @doc """ + Atomically increments the counter for the given key by `step` (default 1). + """ + def increment(key, step \\ 1) do + key = maybe_sandbox_key(key) + @cache_adapter.increment(@cache_name, key, step) + end + + @doc """ + Atomically decrements the counter for the given key by `step` (default 1). + """ + def decrement(key, step \\ 1) do + key = maybe_sandbox_key(key) + @cache_adapter.decrement(@cache_name, key, step) + end + end + end + + @impl Cache + def opts_definition, do: @opts_definition + + @impl Cache + def start_link(opts) do + Task.start_link(fn -> + cache_name = opts[:table_name] + initial_size = opts[:initial_size] || 16 + ref = :counters.new(initial_size, [:atomics]) + :persistent_term.put({cache_name, @ref_key}, ref) + :persistent_term.put({cache_name, @index_map_key}, %{}) + Process.hibernate(Function, :identity, [nil]) + end) + end + + @impl Cache + def child_spec({cache_name, opts}) do + %{ + id: "#{cache_name}_elixir_cache_counter", + start: {Cache.Counter, :start_link, [Keyword.put(opts, :table_name, cache_name)]} + } + end + + @impl Cache + @spec get(atom, atom | String.t(), Keyword.t()) :: ErrorMessage.t_res(integer | nil) + def get(cache_name, key, _opts \\ []) do + index_map = get_index_map(cache_name) + + case Map.get(index_map, key) do + nil -> + {:ok, nil} + + index -> + ref = get_ref(cache_name) + {:ok, :counters.get(ref, index)} + end + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + @impl Cache + @spec put(atom, atom | String.t(), pos_integer | nil, 1 | -1, Keyword.t()) :: + :ok | ErrorMessage.t() + def put(cache_name, key, ttl \\ nil, value, opts \\ []) + + def put(cache_name, key, _ttl, value, _opts) when value in [1, -1] do + {index, ref} = get_or_create_index(cache_name, key) + :counters.add(ref, index, value) + :ok + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + def put(_cache_name, _key, _ttl, value, _opts) do + {:error, + ErrorMessage.unprocessable_entity( + "put/4 value must be 1 or -1 for Cache.Counter, got: #{inspect(value)}" + )} + end + + @impl Cache + @spec delete(atom, atom | String.t(), Keyword.t()) :: :ok | ErrorMessage.t() + def delete(cache_name, key, _opts \\ []) do + index_map = get_index_map(cache_name) + + case Map.get(index_map, key) do + nil -> + :ok + + index -> + ref = get_ref(cache_name) + :counters.put(ref, index, 0) + updated_map = Map.delete(index_map, key) + :persistent_term.put({cache_name, @index_map_key}, updated_map) + :ok + end + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + @spec increment(atom, atom | String.t(), pos_integer) :: :ok | ErrorMessage.t() + def increment(cache_name, key, step \\ 1) do + {index, ref} = get_or_create_index(cache_name, key) + :counters.add(ref, index, step) + :ok + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + @spec decrement(atom, atom | String.t(), pos_integer) :: :ok | ErrorMessage.t() + def decrement(cache_name, key, step \\ 1) do + {index, ref} = get_or_create_index(cache_name, key) + :counters.add(ref, index, -step) + :ok + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + defp get_ref(cache_name) do + :persistent_term.get({cache_name, @ref_key}) + end + + defp get_index_map(cache_name) do + :persistent_term.get({cache_name, @index_map_key}, %{}) + end + + defp get_or_create_index(cache_name, key) do + index_map = get_index_map(cache_name) + + case Map.get(index_map, key) do + nil -> + old_ref = get_ref(cache_name) + old_size = :counters.info(old_ref).size + new_size = old_size + 1 + new_ref = :counters.new(new_size, [:atomics]) + + Enum.each(1..old_size, fn index -> + :counters.put(new_ref, index, :counters.get(old_ref, index)) + end) + + new_index = new_size + updated_map = Map.put(index_map, key, new_index) + :persistent_term.put({cache_name, @ref_key}, new_ref) + :persistent_term.put({cache_name, @index_map_key}, updated_map) + {new_index, new_ref} + + index -> + {index, get_ref(cache_name)} + end + end +end diff --git a/lib/cache/ets.ex b/lib/cache/ets.ex index 2c4039a..7ef52df 100644 --- a/lib/cache/ets.ex +++ b/lib/cache/ets.ex @@ -74,6 +74,8 @@ defmodule Cache.ETS do ``` """ + @compile {:no_warn_undefined, :ets} + use Task, restart: :permanent @behaviour Cache diff --git a/lib/cache/hash_ring.ex b/lib/cache/hash_ring.ex new file mode 100644 index 0000000..53b6443 --- /dev/null +++ b/lib/cache/hash_ring.ex @@ -0,0 +1,289 @@ +defmodule Cache.HashRing do + @moduledoc """ + Consistent hash ring strategy adapter using `libring`. + + This strategy distributes cache keys across Erlang cluster nodes using a + consistent hash ring. When a key is hashed to the local node, the operation + is executed directly. When it hashes to a remote node, the operation is + forwarded via a configurable RPC module (defaults to `:erpc`). + + The ring automatically tracks Erlang node membership using + `HashRing.Managed` with `monitor_nodes: true`, so nodes joining or leaving + the cluster are reflected in the ring automatically. + + ## Usage + + ```elixir + defmodule MyApp.DistributedCache do + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :distributed_cache, + opts: [read_concurrency: true] + end + ``` + + ## Options + + #{NimbleOptions.docs([ + ring_opts: [ + type: :keyword_list, + doc: "Options passed to `HashRing.Worker`, such as `node_blacklist` and `node_whitelist`.", + default: [] + ], + node_weight: [ + type: :pos_integer, + doc: "Number of virtual nodes (shards) per node on the ring. Higher values give more even distribution.", + default: 128 + ], + rpc_module: [ + type: :atom, + doc: "Module used for remote calls. Must implement `call/4` with the same signature as `:erpc.call/4`.", + default: :erpc + ] + ])} + + ## How It Works + + Each node in the cluster starts the same underlying adapter locally. When a + cache operation is performed: + + 1. The key is hashed to determine which node owns it via the consistent ring. + 2. If the owning node is `Node.self()`, the operation is executed locally. + 3. If the owning node is a remote node, the operation is forwarded via the + configured `rpc_module` (default `:erpc`). + + This ensures that each key is always stored on the same node (with the same + ring configuration), enabling efficient distributed caching without a + centralised store. + + ## Read-Repair + + When the ring topology changes (node up/down), some keys will hash to a + different node. `Cache.HashRing.RingMonitor` snapshots the ring before each + change, keeping up to `ring_history_size` previous rings. + + On a `get` miss, the previous rings are consulted in order (newest first). + For each previous ring, if the key hashed to a different (live) node, a + `get` is attempted there. On a hit: + + 1. The value is returned immediately. + 2. It is written to the current owning node (migration). + 3. It is deleted from the old node asynchronously. + + This lazily migrates hot keys after rebalancing without scanning the ring. + + > **Note**: When `sandbox?: true`, the ring is bypassed and all operations + > are executed locally against the sandbox adapter. + """ + + @behaviour Cache.Strategy + + @strategy_keys [:ring_opts, :node_weight, :rpc_module, :ring_history_size, :__cache_module__] + + @opts_definition [ + ring_opts: [ + type: :keyword_list, + doc: "Options passed to HashRing.Worker.", + default: [] + ], + node_weight: [ + type: :pos_integer, + doc: "Number of virtual nodes (shards) per node on the ring.", + default: 128 + ], + rpc_module: [ + type: :atom, + doc: "Module used for remote calls (must implement call/4).", + default: :erpc + ], + ring_history_size: [ + type: :pos_integer, + doc: "Number of previous ring snapshots to keep for read-repair fallback.", + default: 3 + ] + ] + + @impl Cache.Strategy + def opts_definition, do: @opts_definition + + @impl Cache.Strategy + def child_spec({cache_name, underlying_adapter, adapter_opts}) do + ring_name = ring_name(cache_name) + user_ring_opts = adapter_opts[:ring_opts] || [] + ring_opts = Keyword.merge([monitor_nodes: true], user_ring_opts) + underlying_opts = validate_underlying_opts(underlying_adapter, Keyword.drop(adapter_opts, @strategy_keys)) + + managed_ring_spec = %{ + id: ring_name, + type: :worker, + start: {HashRing.Worker, :start_link, [[{:name, ring_name} | ring_opts]]} + } + + ring_monitor_spec = %{ + id: :"#{cache_name}_ring_monitor", + start: + {Cache.HashRing.RingMonitor, :start_link, + [ + [ + cache_name: cache_name, + ring_name: ring_name, + history_size: adapter_opts[:ring_history_size] || 3, + node_blacklist: user_ring_opts[:node_blacklist] || [~r/^remsh.*$/, ~r/^rem-.*$/], + node_whitelist: user_ring_opts[:node_whitelist] || [] + ] + ]} + } + + %{ + id: :"#{cache_name}_hash_ring_supervisor", + type: :supervisor, + start: + {Supervisor, :start_link, + [ + [ + underlying_adapter.child_spec({cache_name, underlying_opts}), + managed_ring_spec, + ring_monitor_spec + ], + [strategy: :one_for_one] + ]} + } + end + + @impl Cache.Strategy + def get(cache_name, key, underlying_adapter, adapter_opts) do + target_node = key_to_node(cache_name, key) + rpc = adapter_opts[:rpc_module] || :erpc + underlying_opts = validate_underlying_opts(underlying_adapter, Keyword.drop(adapter_opts, @strategy_keys)) + + result = + if target_node === Node.self() do + underlying_adapter.get(cache_name, key, underlying_opts) + else + rpc.call(target_node, underlying_adapter, :get, [cache_name, key, underlying_opts]) + end + + case result do + {:ok, nil} -> + read_repair(cache_name, key, target_node, underlying_adapter, underlying_opts, rpc) + + {:ok, encoded} -> + {:ok, Cache.TermEncoder.decode(encoded)} + + {:error, _} = error -> + error + end + end + + @impl Cache.Strategy + def put(cache_name, key, ttl, value, underlying_adapter, adapter_opts) do + target_node = key_to_node(cache_name, key) + rpc = adapter_opts[:rpc_module] || :erpc + underlying_opts = validate_underlying_opts(underlying_adapter, Keyword.drop(adapter_opts, @strategy_keys)) + encoded = Cache.TermEncoder.encode(value, underlying_opts[:compression_level]) + + if target_node === Node.self() do + underlying_adapter.put(cache_name, key, ttl, encoded, underlying_opts) + else + rpc.call(target_node, underlying_adapter, :put, [cache_name, key, ttl, encoded, underlying_opts]) + end + end + + @impl Cache.Strategy + def delete(cache_name, key, underlying_adapter, adapter_opts) do + target_node = key_to_node(cache_name, key) + rpc = adapter_opts[:rpc_module] || :erpc + underlying_opts = validate_underlying_opts(underlying_adapter, Keyword.drop(adapter_opts, @strategy_keys)) + + if target_node === Node.self() do + underlying_adapter.delete(cache_name, key, underlying_opts) + else + rpc.call(target_node, underlying_adapter, :delete, [cache_name, key, underlying_opts]) + end + end + + defp read_repair(cache_name, key, current_node, underlying_adapter, underlying_opts, rpc) do + previous_rings = Cache.HashRing.RingMonitor.previous_rings(cache_name) + + result = + Enum.reduce_while(previous_rings, {:not_found, MapSet.new()}, fn ring, {:not_found, tried} -> + old_node = HashRing.key_to_node(ring, key) + + cond do + old_node === current_node -> + {:cont, {:not_found, tried}} + + MapSet.member?(tried, old_node) -> + {:cont, {:not_found, tried}} + + true -> + case rpc_get(rpc, old_node, underlying_adapter, cache_name, key, underlying_opts) do + {:ok, nil} -> + {:cont, {:not_found, MapSet.put(tried, old_node)}} + + {:ok, encoded} -> + {:halt, {:found, encoded, old_node}} + + :unavailable -> + {:cont, {:not_found, MapSet.put(tried, old_node)}} + end + end + end) + + case result do + {:found, encoded, old_node} -> + value = Cache.TermEncoder.decode(encoded) + migrate_value(cache_name, key, value, current_node, old_node, underlying_adapter, underlying_opts, rpc) + {:ok, value} + + {:not_found, _tried} -> + {:ok, nil} + end + end + + defp migrate_value(cache_name, key, value, current_node, old_node, underlying_adapter, underlying_opts, rpc) do + encoded = Cache.TermEncoder.encode(value, underlying_opts[:compression_level]) + + if current_node === Node.self() do + underlying_adapter.put(cache_name, key, nil, encoded, underlying_opts) + else + rpc.call(current_node, underlying_adapter, :put, [cache_name, key, nil, encoded, underlying_opts]) + end + + Task.start(fn -> + rpc.call(old_node, underlying_adapter, :delete, [cache_name, key, underlying_opts]) + end) + end + + defp rpc_get(rpc, node, adapter, cache_name, key, opts) do + case rpc.call(node, adapter, :get, [cache_name, key, opts]) do + {:ok, _} = result -> result + {:error, _} -> :unavailable + {:badrpc, _} -> :unavailable + end + rescue + _ -> :unavailable + catch + :exit, _ -> :unavailable + end + + defp key_to_node(cache_name, key) do + ring = ring_name(cache_name) + + case HashRing.Managed.key_to_node(ring, key) do + {:error, {:invalid_ring, :no_nodes}} -> Node.self() + {:error, :no_such_ring} -> Node.self() + node -> node + end + end + + defp validate_underlying_opts(adapter, opts) do + if Code.ensure_loaded?(adapter) and function_exported?(adapter, :opts_definition, 0) do + NimbleOptions.validate!(opts, adapter.opts_definition()) + else + opts + end + end + + defp ring_name(cache_name), do: :"#{cache_name}_hash_ring" +end diff --git a/lib/cache/hash_ring/ring_monitor.ex b/lib/cache/hash_ring/ring_monitor.ex new file mode 100644 index 0000000..433250f --- /dev/null +++ b/lib/cache/hash_ring/ring_monitor.ex @@ -0,0 +1,84 @@ +defmodule Cache.HashRing.RingMonitor do + @moduledoc false + + use GenServer + + @libring_ets_prefix "libring_" + + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @spec previous_rings(atom()) :: [HashRing.t()] + def previous_rings(cache_name) do + table = history_table_name(cache_name) + + case :ets.whereis(table) do + :undefined -> + [] + + _ -> + case :ets.lookup(table, :previous_rings) do + [{:previous_rings, rings}] -> rings + [] -> [] + end + end + end + + @impl GenServer + def init(opts) do + cache_name = Keyword.fetch!(opts, :cache_name) + ring_name = Keyword.fetch!(opts, :ring_name) + history_size = Keyword.get(opts, :history_size, 3) + node_blacklist = Keyword.get(opts, :node_blacklist, [~r/^remsh.*$/, ~r/^rem-.*$/]) + node_whitelist = Keyword.get(opts, :node_whitelist, []) + + table = :ets.new(history_table_name(cache_name), [:set, :public, :named_table]) + :ets.insert(table, {:previous_rings, []}) + + :ok = :net_kernel.monitor_nodes(true, node_type: :all) + + {:ok, + %{ + table: table, + ring_name: ring_name, + history_size: history_size, + node_blacklist: node_blacklist, + node_whitelist: node_whitelist + }} + end + + @impl GenServer + def handle_info({event, node, _info}, state) + when event in [:nodeup, :nodedown] do + unless HashRing.Utils.ignore_node?(node, state.node_blacklist, state.node_whitelist) do + snapshot_ring(state) + end + + {:noreply, state} + end + + def handle_info(_msg, state), do: {:noreply, state} + + defp snapshot_ring(%{table: table, ring_name: ring_name, history_size: history_size}) do + libring_table = :"#{@libring_ets_prefix}#{ring_name}" + + with [{:ring, current_ring}] <- safe_ets_lookup(libring_table, :ring) do + [{:previous_rings, existing}] = :ets.lookup(table, :previous_rings) + + updated = Enum.take([current_ring | existing], history_size) + + :ets.insert(table, {:previous_rings, updated}) + end + end + + defp safe_ets_lookup(table, key) do + :ets.lookup(table, key) + rescue + _ -> [] + end + + @spec history_table_name(atom()) :: atom() + def history_table_name(cache_name), do: :"#{cache_name}_ring_history" +end diff --git a/lib/cache/multi_layer.ex b/lib/cache/multi_layer.ex new file mode 100644 index 0000000..013ec7f --- /dev/null +++ b/lib/cache/multi_layer.ex @@ -0,0 +1,224 @@ +defmodule Cache.MultiLayer do + @moduledoc """ + Multi-layer caching strategy that cascades through multiple cache layers. + + Keys are read from fastest to slowest, with automatic backfill on cache hits + from slower layers. Writes go slowest-first to avoid polluting fast layers + with data that failed to persist in slow ones. + + ## Usage + + Pass a list of layers as the strategy config. Each element can be: + + - A module that implements `Cache` (already running, not supervised by this adapter) + - An adapter module (e.g. `Cache.ETS`) — will be auto-started and supervised + - A tuple `{AdapterModule, opts}` — adapter with inline opts + + ```elixir + defmodule MyApp.LayeredCache do + use Cache, + adapter: {Cache.MultiLayer, [Cache.ETS, MyApp.RedisCache]}, + name: :layered_cache, + opts: [] + end + ``` + + ## `__MODULE__` in Layers + + You may include `__MODULE__` in the layer list to position the current + module's own underlying cache within the chain. If `__MODULE__` is omitted, + no local cache is created for the defining module—it acts as a pure facade. + + ```elixir + defmodule MyApp.LayeredCache do + use Cache, + adapter: {Cache.MultiLayer, [Cache.ETS, __MODULE__, MyApp.RedisCache]}, + name: :layered_cache, + opts: [uri: "redis://localhost"] + end + ``` + + ## Read Behaviour + + Layers are iterated fastest → slowest (list order). On a hit from layer N, + the value is backfilled into layers 1..N-1. + + ## Write Behaviour + + Layers are written slowest → fastest (reverse list order). If a slow write + fails, the write stops and an error is returned — preventing polluting faster + layers with potentially-unsaved data. + + ## Fetch Callback (Optional) + + If all layers miss, an optional fetch callback can supply the value. The + fetched value is then backfilled into all layers. + + Define it as a module callback or pass it via opts: + + ```elixir + defmodule MyApp.LayeredCache do + use Cache, + adapter: {Cache.MultiLayer, [Cache.ETS, MyApp.RedisCache]}, + name: :layered_cache, + opts: [on_fetch: &__MODULE__.fetch/1] + + def fetch(key) do + {:ok, "value_for_\#{key}"} + end + end + ``` + + ## Options + + #{NimbleOptions.docs([ + on_fetch: [ + type: {:or, [:mfa, {:fun, 1}]}, + doc: "Optional fetch callback invoked on total cache miss. Receives the key, returns `{:ok, value}` or `{:error, reason}`." + ], + backfill_ttl: [ + type: {:or, [:pos_integer, nil]}, + doc: "TTL in milliseconds to use when backfilling layers on a hit from a slower layer. Defaults to nil (no expiry)." + ] + ])} + """ + + @behaviour Cache.Strategy + + @opts_definition [ + on_fetch: [ + type: {:or, [:mfa, {:fun, 1}]}, + doc: "Optional fetch callback for cache miss." + ], + backfill_ttl: [ + type: {:or, [:pos_integer, nil]}, + doc: "TTL for backfilled entries." + ] + ] + + @impl Cache.Strategy + def opts_definition, do: @opts_definition + + @impl Cache.Strategy + def child_spec({cache_name, _layers, _adapter_opts}) do + %{ + id: :"#{cache_name}_multi_layer", + start: {Agent, :start_link, [fn -> :ok end, [name: :"#{cache_name}_multi_layer"]]} + } + end + + @impl Cache.Strategy + def get(cache_name, key, layers, adapter_opts) do + backfill_ttl = adapter_opts[:backfill_ttl] + + case get_from_layers(cache_name, key, layers, adapter_opts, []) do + {:hit, value, layers_to_backfill} -> + backfill_layers(cache_name, key, layers_to_backfill, value, backfill_ttl) + {:ok, value} + + :miss -> + fetch_on_miss(cache_name, key, layers, adapter_opts) + end + end + + @impl Cache.Strategy + def put(cache_name, key, ttl, value, layers, adapter_opts) do + reversed = Enum.reverse(layers) + put_to_layers(cache_name, key, ttl, value, reversed, adapter_opts) + end + + @impl Cache.Strategy + def delete(cache_name, key, layers, _adapter_opts) do + Enum.reduce_while(layers, :ok, fn layer, _acc -> + case layer_delete(cache_name, key, layer) do + :ok -> {:cont, :ok} + {:error, _} = error -> {:halt, error} + end + end) + end + + defp get_from_layers(_cache_name, _key, [], _adapter_opts, _visited), do: :miss + + defp get_from_layers(cache_name, key, [layer | rest], adapter_opts, visited) do + case layer_get(cache_name, key, layer) do + {:ok, nil} -> + get_from_layers(cache_name, key, rest, adapter_opts, [layer | visited]) + + {:ok, value} -> + {:hit, value, visited} + + {:error, _} -> + get_from_layers(cache_name, key, rest, adapter_opts, [layer | visited]) + end + end + + defp fetch_on_miss(cache_name, key, layers, adapter_opts) do + on_fetch = adapter_opts[:on_fetch] + + if is_nil(on_fetch) do + {:ok, nil} + else + case invoke_callback(on_fetch, [key]) do + {:ok, value} -> + backfill_ttl = adapter_opts[:backfill_ttl] + backfill_layers(cache_name, key, layers, value, backfill_ttl) + {:ok, value} + + {:error, _} = error -> + error + end + end + end + + defp put_to_layers(_cache_name, _key, _ttl, _value, [], _adapter_opts), do: :ok + + defp put_to_layers(cache_name, key, ttl, value, [layer | rest], adapter_opts) do + case layer_put(cache_name, key, ttl, value, layer) do + :ok -> put_to_layers(cache_name, key, ttl, value, rest, adapter_opts) + {:error, _} = error -> error + end + end + + defp backfill_layers(_cache_name, _key, [], _value, _ttl), do: :ok + + defp backfill_layers(cache_name, key, [layer | rest], value, ttl) do + layer_put(cache_name, key, ttl, value, layer) + backfill_layers(cache_name, key, rest, value, ttl) + end + + defp layer_get(_cache_name, key, layer) when is_atom(layer) do + if cache_module?(layer) do + layer.get(key) + else + {:ok, nil} + end + end + + defp layer_put(_cache_name, key, ttl, value, layer) when is_atom(layer) do + if cache_module?(layer) do + layer.put(key, ttl, value) + else + :ok + end + end + + defp layer_delete(_cache_name, key, layer) when is_atom(layer) do + if cache_module?(layer) do + layer.delete(key) + else + :ok + end + end + + defp cache_module?(module) do + function_exported?(module, :get, 1) and function_exported?(module, :put, 2) + end + + defp invoke_callback({module, function, args}, extra_args) do + apply(module, function, args ++ extra_args) + end + + defp invoke_callback(fun, args) when is_function(fun) do + apply(fun, args) + end +end diff --git a/lib/cache/persistent_term.ex b/lib/cache/persistent_term.ex new file mode 100644 index 0000000..8274220 --- /dev/null +++ b/lib/cache/persistent_term.ex @@ -0,0 +1,74 @@ +defmodule Cache.PersistentTerm do + @moduledoc """ + `:persistent_term` adapter for storing rarely-written, frequently-read cached values. + + This adapter stores values in Erlang's `:persistent_term` storage, which provides + extremely fast read access at the cost of more expensive writes and deletes. + It is best suited for configuration values or other data that changes infrequently. + + TTL is not supported — values persist until explicitly deleted. + + ## Example + + ```elixir + defmodule MyApp.Cache do + use Cache, + adapter: Cache.PersistentTerm, + name: :my_app_persistent_cache, + opts: [] + end + ``` + """ + + use Task, restart: :permanent + + @behaviour Cache + + @impl Cache + def opts_definition, do: [] + + @impl Cache + def start_link(opts) do + Task.start_link(fn -> + _table_name = opts[:table_name] + Process.hibernate(Function, :identity, [nil]) + end) + end + + @impl Cache + def child_spec({cache_name, opts}) do + %{ + id: "#{cache_name}_elixir_cache_persistent_term", + start: {Cache.PersistentTerm, :start_link, [Keyword.put(opts, :table_name, cache_name)]} + } + end + + @impl Cache + @spec get(atom, atom | String.t(), Keyword.t()) :: ErrorMessage.t_res(any) + def get(cache_name, key, _opts \\ []) do + {:ok, :persistent_term.get({cache_name, key}, nil)} + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + @impl Cache + @spec put(atom, atom | String.t(), pos_integer | nil, any, Keyword.t()) :: :ok | ErrorMessage.t() + def put(cache_name, key, _ttl \\ nil, value, _opts \\ []) do + :persistent_term.put({cache_name, key}, value) + :ok + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end + + @impl Cache + @spec delete(atom, atom | String.t(), Keyword.t()) :: :ok | ErrorMessage.t() + def delete(cache_name, key, _opts \\ []) do + :persistent_term.erase({cache_name, key}) + :ok + rescue + exception -> + {:error, ErrorMessage.internal_server_error(Exception.message(exception), %{cache: cache_name, key: key})} + end +end diff --git a/lib/cache/redis.ex b/lib/cache/redis.ex index 4d75156..5ee4b83 100644 --- a/lib/cache/redis.ex +++ b/lib/cache/redis.ex @@ -99,6 +99,11 @@ defmodule Cache.Redis do end def hash_set_many(keys_fields_values, ttl \\ nil) do + keys_fields_values = + Enum.map(keys_fields_values, fn {key, fields_values} -> + {maybe_sandbox_key(key), fields_values} + end) + @cache_adapter.hash_set_many(@cache_name, keys_fields_values, ttl, adapter_options()) end diff --git a/lib/cache/refresh_ahead.ex b/lib/cache/refresh_ahead.ex new file mode 100644 index 0000000..e09298c --- /dev/null +++ b/lib/cache/refresh_ahead.ex @@ -0,0 +1,305 @@ +defmodule Cache.RefreshAhead do + @moduledoc """ + Refresh-ahead caching strategy that proactively refreshes values before they expire. + + Values are stored with a timestamp. On `get`, if the value is within the + refresh window (i.e. `now - inserted_at >= ttl - refresh_before`), the current + value is returned immediately and an async `Task` is spawned to refresh it. + + Only keys that are actively read get refreshed — unread keys naturally expire + from the underlying adapter. + + ## Usage + + ```elixir + defmodule MyApp.Cache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.Redis}, + name: :my_cache, + opts: [ + uri: "redis://localhost:6379", + refresh_before: :timer.seconds(30) + ] + + def refresh(key) do + {:ok, fetch_fresh_value(key)} + end + end + ``` + + ## Options + + #{NimbleOptions.docs([ + refresh_before: [ + type: :pos_integer, + required: true, + doc: "Milliseconds before TTL expiry at which to trigger a background refresh." + ], + on_refresh: [ + type: {:or, [:mfa, {:fun, 1}]}, + doc: "Optional refresh callback. If not provided, the cache module must define `refresh/1`." + ], + lock_node_whitelist: [ + type: {:or, [:atom, {:list, :atom}]}, + doc: "Optional node whitelist for distributed refresh locks. Defaults to all connected nodes." + ] + ])} + + ## How It Works + + 1. `put/5` wraps the value as `{value, inserted_at_ms, ttl_ms}` before delegating. + 2. `get/4` unwraps the tuple. If `now - inserted_at >= ttl - refresh_before`, + a background `Task` is spawned to call the refresh callback with the key. + 3. A per-cache ETS deduplication table (`:_refresh_tracker`) prevents + multiple concurrent refresh tasks for the same key. + 4. On successful refresh, `put/5` is called with the new value and the same TTL, + resetting the inserted_at timestamp. + + > **Note**: When `sandbox?: true`, values are stored unwrapped. Refresh logic + > is bypassed entirely. + """ + + @behaviour Cache.Strategy + + @opts_definition [ + refresh_before: [ + type: :pos_integer, + required: true, + doc: "Milliseconds before TTL expiry to trigger a background refresh." + ], + on_refresh: [ + type: {:or, [:mfa, {:fun, 1}]}, + doc: "Optional refresh callback override." + ], + lock_node_whitelist: [ + type: {:or, [:atom, {:list, :atom}]}, + doc: "Optional node whitelist for distributed refresh locks." + ] + ] + + @impl Cache.Strategy + def opts_definition, do: @opts_definition + + @impl Cache.Strategy + def child_spec({cache_name, underlying_adapter, adapter_opts}) do + tracker_name = tracker_name(cache_name) + + underlying_adapter_opts = + validate_underlying_opts( + underlying_adapter, + Keyword.drop(adapter_opts, [:refresh_before, :on_refresh, :lock_node_whitelist, :__cache_module__]) + ) + + %{ + id: :"#{cache_name}_refresh_ahead_supervisor", + type: :supervisor, + start: + {Supervisor, :start_link, + [ + [ + underlying_adapter.child_spec({cache_name, underlying_adapter_opts}), + %{ + id: :"#{cache_name}_refresh_tracker", + start: {__MODULE__, :start_tracker, [tracker_name]} + } + ], + [strategy: :one_for_one] + ]} + } + end + + @doc false + def start_tracker(tracker_name) do + if :ets.whereis(tracker_name) === :undefined do + :ets.new(tracker_name, [:set, :public, :named_table]) + end + + Task.start_link(fn -> Process.hibernate(Function, :identity, [nil]) end) + end + + @impl Cache.Strategy + def get(cache_name, key, underlying_adapter, adapter_opts) do + + underlying_opts = + validate_underlying_opts( + underlying_adapter, + Keyword.drop(adapter_opts, [:refresh_before, :on_refresh, :lock_node_whitelist, :__cache_module__]) + ) + + compression_level = underlying_opts[:compression_level] + + case underlying_adapter.get(cache_name, key, underlying_opts) do + {:ok, nil} -> + {:ok, nil} + + {:ok, encoded} -> + case Cache.TermEncoder.decode(encoded) do + {value, inserted_at, ttl} -> + maybe_refresh_async( + cache_name, key, value, inserted_at, ttl, + underlying_adapter, adapter_opts, compression_level + ) + {:ok, value} + + value -> + {:ok, value} + end + + {:error, _} = error -> + error + end + end + + @impl Cache.Strategy + def put(cache_name, key, ttl, value, underlying_adapter, adapter_opts) do + + underlying_opts = + validate_underlying_opts( + underlying_adapter, + Keyword.drop(adapter_opts, [:refresh_before, :on_refresh, :lock_node_whitelist, :__cache_module__]) + ) + + compression_level = underlying_opts[:compression_level] + inserted_at = System.monotonic_time(:millisecond) + wrapped = Cache.TermEncoder.encode({value, inserted_at, ttl}, compression_level) + underlying_adapter.put(cache_name, key, ttl, wrapped, underlying_opts) + end + + @impl Cache.Strategy + def delete(cache_name, key, underlying_adapter, adapter_opts) do + + underlying_opts = + validate_underlying_opts( + underlying_adapter, + Keyword.drop(adapter_opts, [:refresh_before, :on_refresh, :lock_node_whitelist, :__cache_module__]) + ) + + tracker = tracker_name(cache_name) + safe_ets_delete(tracker, key) + underlying_adapter.delete(cache_name, key, underlying_opts) + end + + defp maybe_refresh_async(cache_name, key, _value, inserted_at, ttl, underlying_adapter, adapter_opts, compression_level) + when is_integer(ttl) do + refresh_before = adapter_opts[:refresh_before] + now = System.monotonic_time(:millisecond) + age = now - inserted_at + + if age >= ttl - refresh_before do + maybe_spawn_refresh(cache_name, key, ttl, underlying_adapter, adapter_opts, compression_level) + end + end + + defp maybe_refresh_async(_cache_name, _key, _value, _inserted_at, _ttl, _underlying_adapter, _adapter_opts, _compression_level), + do: :ok + + defp maybe_spawn_refresh(cache_name, key, ttl, underlying_adapter, adapter_opts, compression_level) do + tracker = tracker_name(cache_name) + + if safe_ets_insert_new(tracker, {key, true}) do + Task.start(fn -> + lock_resource = {:refresh_ahead_lock, cache_name, key} + lock_id = {lock_resource, self()} + lock_nodes = lock_nodes(adapter_opts[:lock_node_whitelist]) + + try do + if safe_global_set_lock(lock_id, lock_nodes) do + on_refresh = adapter_opts[:on_refresh] + cache_module = adapter_opts[:__cache_module__] + + case invoke_refresh(on_refresh, cache_module, key) do + {:ok, new_value} -> + underlying_opts = + Keyword.drop(adapter_opts, [ + :refresh_before, + :on_refresh, + :lock_node_whitelist, + :__cache_module__ + ]) + + new_inserted_at = System.monotonic_time(:millisecond) + wrapped = Cache.TermEncoder.encode({new_value, new_inserted_at, ttl}, compression_level) + underlying_adapter.put(cache_name, key, ttl, wrapped, underlying_opts) + + {:error, _} -> + :ok + end + end + after + safe_ets_delete(tracker, key) + end + end) + end + end + + defp invoke_refresh(nil, cache_module, key) when is_atom(cache_module) and not is_nil(cache_module) do + cache_module.refresh(key) + rescue + UndefinedFunctionError -> + {:error, + ErrorMessage.internal_server_error( + "Cache.RefreshAhead requires a refresh/1 callback on #{inspect(cache_module)} or an on_refresh opt", + %{cache_module: cache_module, key: key} + )} + end + + defp invoke_refresh(nil, cache_module, key) do + {:error, + ErrorMessage.internal_server_error( + "Cache.RefreshAhead requires a refresh/1 callback or an on_refresh opt", + %{cache_module: cache_module, key: key} + )} + end + + defp invoke_refresh({module, function, args}, _cache_name, key) do + apply(module, function, args ++ [key]) + end + + defp invoke_refresh(fun, _cache_name, key) when is_function(fun, 1) do + fun.(key) + end + + defp safe_ets_insert_new(tracker, record) do + :ets.insert_new(tracker, record) + rescue + _ -> false + end + + defp safe_ets_delete(tracker, key) do + :ets.delete(tracker, key) + rescue + _ -> :ok + end + + defp safe_global_set_lock(lock_id, lock_nodes) do + :global.set_lock(lock_id, lock_nodes, 0) + rescue + _ -> false + end + + defp lock_nodes(lock_node_whitelist) do + connected_nodes = [Node.self() | Node.list()] + + nodes = + case lock_node_whitelist do + nil -> connected_nodes + node when is_atom(node) -> [node] + whitelist when is_list(whitelist) -> whitelist + end + + nodes + |> Enum.filter(&(&1 in connected_nodes)) + |> Kernel.++([Node.self()]) + |> Enum.uniq() + end + + defp validate_underlying_opts(adapter, opts) do + if Code.ensure_loaded?(adapter) and function_exported?(adapter, :opts_definition, 0) do + NimbleOptions.validate!(opts, adapter.opts_definition()) + else + opts + end + end + + defp tracker_name(cache_name), do: :"#{cache_name}_refresh_tracker" +end diff --git a/lib/cache/strategy.ex b/lib/cache/strategy.ex new file mode 100644 index 0000000..bb40c02 --- /dev/null +++ b/lib/cache/strategy.ex @@ -0,0 +1,97 @@ +defmodule Cache.Strategy do + @moduledoc """ + Behaviour for strategy-based cache adapters. + + Strategy adapters compose over existing cache adapters to provide higher-level + caching patterns such as consistent hashing, multi-layer cascading, and + refresh-ahead semantics. + + Unlike regular adapters which implement `Cache` directly, strategy adapters + receive the underlying adapter module and its resolved options so they can + delegate operations appropriately. + + ## Usage + + Strategies are specified using the tuple format in `use Cache`: + + ```elixir + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :my_cache, + opts: [read_concurrency: true] + ``` + + The first element is the strategy module, the second is the underlying adapter + (or strategy-specific configuration). + """ + + @doc """ + Returns the NimbleOptions schema for validating strategy-level opts. + """ + @callback opts_definition() :: Keyword.t() + + @doc """ + Returns a supervisor child spec for the strategy. + + Receives the cache name, strategy config (the second element of the adapter + tuple), and the resolved underlying adapter opts. + """ + @callback child_spec({ + cache_name :: atom, + strategy_config :: term, + adapter_opts :: Keyword.t() + }) :: Supervisor.child_spec() | :supervisor.child_spec() + + @doc """ + Fetches a value from the cache using the strategy's routing/layering logic. + """ + @callback get( + cache_name :: atom, + key :: atom | String.t(), + strategy_config :: term, + adapter_opts :: Keyword.t() + ) :: ErrorMessage.t_res(any) + + @doc """ + Stores a value in the cache using the strategy's routing/layering logic. + """ + @callback put( + cache_name :: atom, + key :: atom | String.t(), + ttl :: pos_integer | nil, + value :: any, + strategy_config :: term, + adapter_opts :: Keyword.t() + ) :: :ok | ErrorMessage.t() + + @doc """ + Removes a value from the cache using the strategy's routing/layering logic. + """ + @callback delete( + cache_name :: atom, + key :: atom | String.t(), + strategy_config :: term, + adapter_opts :: Keyword.t() + ) :: :ok | ErrorMessage.t() + + @doc """ + Returns true if the given module implements the `Cache.Strategy` behaviour. + """ + @spec strategy?(module()) :: boolean() + def strategy?(module) do + module + |> module_behaviours() + |> Enum.member?(Cache.Strategy) + rescue + _ -> false + end + + defp module_behaviours(module) do + module + |> :erlang.apply(:module_info, [:attributes]) + |> Keyword.get_values(:behaviour) + |> List.flatten() + rescue + _ -> [] + end +end diff --git a/mix.exs b/mix.exs index ee25bb2..b651d17 100644 --- a/mix.exs +++ b/mix.exs @@ -50,6 +50,7 @@ defmodule ElixirCache.MixProject do {:telemetry, "~> 1.1"}, {:telemetry_metrics, "~> 1.0"}, {:prometheus_telemetry, "~> 0.3", optional: true}, + {:libring, "~> 1.7"}, {:faker, "~> 0.17", only: [:test]}, {:credo, "~> 1.6", only: [:test, :dev], runtime: false}, {:blitz_credo_checks, "~> 0.1", only: [:test, :dev], runtime: false}, @@ -84,6 +85,7 @@ defmodule ElixirCache.MixProject do "guides/tutorials/installation.md", "guides/tutorials/basic_operations.md", "guides/tutorials/advanced_techniques.md", + "guides/how-to/using_strategies.md", "guides/how-to/choosing_adapter.md", "guides/how-to/redis_setup.md", "guides/how-to/testing_with_cache.md", @@ -107,7 +109,9 @@ defmodule ElixirCache.MixProject do Cache.ETS, Cache.DETS, Cache.Redis, - Cache.ConCache + Cache.ConCache, + Cache.PersistentTerm, + Cache.Counter ], "Test Utils": [ Cache.Sandbox, diff --git a/mix.lock b/mix.lock index 23ca2be..c42dc4d 100644 --- a/mix.lock +++ b/mix.lock @@ -18,6 +18,7 @@ "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, diff --git a/test/cache/counter_test.exs b/test/cache/counter_test.exs new file mode 100644 index 0000000..3b64418 --- /dev/null +++ b/test/cache/counter_test.exs @@ -0,0 +1,121 @@ +defmodule Cache.CounterTest do + use ExUnit.Case, async: true + + defmodule TestCounterCache do + use Cache, + adapter: Cache.Counter, + name: :test_counter_cache, + opts: [initial_size: 16] + end + + setup do + start_supervised({Cache, [TestCounterCache]}) + Process.sleep(50) + :ok + end + + describe "get/1" do + test "returns nil for an unknown key" do + assert {:ok, nil} === TestCounterCache.get(:unknown_key) + end + + test "returns integer value after increment" do + TestCounterCache.increment(:get_test_key) + assert {:ok, 1} === TestCounterCache.get(:get_test_key) + end + end + + describe "put/2 as increment/decrement" do + test "put with 1 increments the counter" do + assert :ok === TestCounterCache.put(:put_inc_key, 1) + assert {:ok, 1} === TestCounterCache.get(:put_inc_key) + assert :ok === TestCounterCache.put(:put_inc_key, 1) + assert {:ok, 2} === TestCounterCache.get(:put_inc_key) + end + + test "put with -1 decrements the counter" do + TestCounterCache.put(:put_dec_key, 1) + TestCounterCache.put(:put_dec_key, 1) + assert :ok === TestCounterCache.put(:put_dec_key, -1) + assert {:ok, 1} === TestCounterCache.get(:put_dec_key) + end + + test "put with 0 returns an error" do + assert {:error, _} = TestCounterCache.put(:bad_key, 0) + end + + test "put with 2 returns an error" do + assert {:error, _} = TestCounterCache.put(:bad_key, 2) + end + + test "put with a string returns an error" do + assert {:error, _} = TestCounterCache.put(:bad_key, "up") + end + end + + describe "increment/1,2" do + test "increments by 1 by default" do + assert :ok === TestCounterCache.increment(:inc_key) + assert {:ok, 1} === TestCounterCache.get(:inc_key) + end + + test "increments by the given step" do + assert :ok === TestCounterCache.increment(:inc_step_key, 5) + assert {:ok, 5} === TestCounterCache.get(:inc_step_key) + end + + test "increments multiple times" do + TestCounterCache.increment(:inc_multi_key) + TestCounterCache.increment(:inc_multi_key) + TestCounterCache.increment(:inc_multi_key) + assert {:ok, 3} === TestCounterCache.get(:inc_multi_key) + end + end + + describe "decrement/1,2" do + test "decrements by 1 by default" do + TestCounterCache.increment(:dec_key, 3) + assert :ok === TestCounterCache.decrement(:dec_key) + assert {:ok, 2} === TestCounterCache.get(:dec_key) + end + + test "decrements by the given step" do + TestCounterCache.increment(:dec_step_key, 10) + assert :ok === TestCounterCache.decrement(:dec_step_key, 4) + assert {:ok, 6} === TestCounterCache.get(:dec_step_key) + end + + test "can go negative" do + assert :ok === TestCounterCache.decrement(:neg_key, 5) + assert {:ok, -5} === TestCounterCache.get(:neg_key) + end + end + + describe "delete/1" do + test "removes the key so get returns nil" do + TestCounterCache.increment(:del_key) + assert :ok === TestCounterCache.delete(:del_key) + assert {:ok, nil} === TestCounterCache.get(:del_key) + end + + test "is a no-op for a non-existent key" do + assert :ok === TestCounterCache.delete(:never_set_del_key) + end + + test "after delete, incrementing creates a fresh counter" do + TestCounterCache.increment(:reuse_key, 10) + TestCounterCache.delete(:reuse_key) + TestCounterCache.increment(:reuse_key) + assert {:ok, 1} === TestCounterCache.get(:reuse_key) + end + end + + describe "multiple keys are independent" do + test "counters for different keys do not interfere" do + TestCounterCache.increment(:key_a, 3) + TestCounterCache.increment(:key_b, 7) + assert {:ok, 3} === TestCounterCache.get(:key_a) + assert {:ok, 7} === TestCounterCache.get(:key_b) + end + end +end diff --git a/test/cache/hash_ring_test.exs b/test/cache/hash_ring_test.exs new file mode 100644 index 0000000..b687d2a --- /dev/null +++ b/test/cache/hash_ring_test.exs @@ -0,0 +1,290 @@ +defmodule Cache.HashRingTest do + use ExUnit.Case, async: true + + defmodule TestHashRingCache do + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :test_hash_ring_cache, + opts: [] + end + + setup do + start_supervised!(%{ + id: :hash_ring_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[TestHashRingCache], [name: :hash_ring_cache_sup]]} + }) + + Process.sleep(50) + + :ok + end + + describe "put/3 and get/1" do + test "stores and retrieves a value" do + assert :ok === TestHashRingCache.put("user:1", "Alice") + assert {:ok, "Alice"} === TestHashRingCache.get("user:1") + end + + test "returns nil for missing keys" do + assert {:ok, nil} === TestHashRingCache.get("missing:key") + end + + test "stores and retrieves different value types" do + assert :ok === TestHashRingCache.put("map_key", %{name: "Bob", age: 30}) + assert {:ok, %{name: "Bob", age: 30}} === TestHashRingCache.get("map_key") + + assert :ok === TestHashRingCache.put("list_key", [1, 2, 3]) + assert {:ok, [1, 2, 3]} === TestHashRingCache.get("list_key") + end + + test "overwrites existing values" do + assert :ok === TestHashRingCache.put("overwrite_key", "first") + assert :ok === TestHashRingCache.put("overwrite_key", "second") + assert {:ok, "second"} === TestHashRingCache.get("overwrite_key") + end + end + + describe "delete/1" do + test "removes a stored value" do + assert :ok === TestHashRingCache.put("delete_key", "to_delete") + assert {:ok, "to_delete"} === TestHashRingCache.get("delete_key") + assert :ok === TestHashRingCache.delete("delete_key") + assert {:ok, nil} === TestHashRingCache.get("delete_key") + end + + test "deleting a non-existent key returns ok" do + assert :ok === TestHashRingCache.delete("nonexistent_key") + end + end + + describe "cache_adapter/0" do + test "returns the strategy module as adapter" do + assert TestHashRingCache.cache_adapter() === Cache.HashRing + end + end + + describe "single-node ring routing" do + test "routes all keys to Node.self() when single node" do + assert :ok === TestHashRingCache.put("ring:key1", "val1") + assert :ok === TestHashRingCache.put("ring:key2", "val2") + assert :ok === TestHashRingCache.put("ring:key3", "val3") + + assert {:ok, "val1"} === TestHashRingCache.get("ring:key1") + assert {:ok, "val2"} === TestHashRingCache.get("ring:key2") + assert {:ok, "val3"} === TestHashRingCache.get("ring:key3") + end + end + + describe "get_or_create/2" do + test "creates value when missing" do + result = + TestHashRingCache.get_or_create("create_key", fn -> + {:ok, "created_value"} + end) + + assert {:ok, "created_value"} === result + assert {:ok, "created_value"} === TestHashRingCache.get("create_key") + end + + test "returns existing value without calling function" do + TestHashRingCache.put("existing_key", "existing_value") + + result = + TestHashRingCache.get_or_create("existing_key", fn -> + raise "should not be called" + end) + + assert {:ok, "existing_value"} === result + end + end + + describe "Cache.Strategy.strategy?/1" do + test "recognises Cache.HashRing as a strategy" do + assert Cache.Strategy.strategy?(Cache.HashRing) === true + end + + test "does not recognise regular adapters as strategies" do + refute Cache.Strategy.strategy?(Cache.ETS) + refute Cache.Strategy.strategy?(Cache.Agent) + end + end + + describe "read-repair" do + test "returns nil when no previous rings exist and key is missing" do + assert {:ok, nil} === TestHashRingCache.get("repair:missing") + end + + test "skips previous rings where old node equals current node" do + cache_name = :test_hash_ring_cache + + previous_ring = HashRing.add_node(HashRing.new(), Node.self()) + inject_previous_rings(cache_name, [previous_ring]) + + assert {:ok, nil} === TestHashRingCache.get("repair:same_node_key") + end + + test "skips previous rings where old node is not live" do + cache_name = :test_hash_ring_cache + + previous_ring = HashRing.add_node(HashRing.new(), :dead_node@nowhere) + inject_previous_rings(cache_name, [previous_ring]) + + assert {:ok, nil} === TestHashRingCache.get("repair:dead_node_key") + end + + test "deduplicates rpc attempts across ring generations pointing to the same node" do + cache_name = :test_hash_ring_cache + fake_node = :fake_dedup@node + call_count = :"dedup_count_#{:erlang.unique_integer([:positive])}" + {:ok, _} = Agent.start_link(fn -> 0 end, name: call_count) + + rpc_module = build_rpc(fn _node, _mod, _func, _args -> + Agent.update(call_count, &(&1 + 1)) + {:ok, nil} + end) + + previous_ring1 = HashRing.add_node(HashRing.new(), fake_node) + previous_ring2 = HashRing.add_node(HashRing.new(), fake_node) + inject_previous_rings(cache_name, [previous_ring1, previous_ring2]) + + Cache.HashRing.get(cache_name, "repair:dedup_key", Cache.ETS, rpc_module: rpc_module) + + Process.sleep(50) + + assert Agent.get(call_count, & &1) === 1 + end + + test "recovers value via read-repair from old ring owner" do + cache_name = :test_hash_ring_cache + fake_node = :fake_live@node + encoded = Cache.TermEncoder.encode("repaired_value", nil) + + rpc_module = build_rpc(fn node, _mod, func, _args -> + cond do + node === fake_node and func === :get -> {:ok, encoded} + func === :put -> :ok + func === :delete -> :ok + true -> {:ok, nil} + end + end) + + previous_ring = HashRing.add_node(HashRing.new(), fake_node) + inject_previous_rings(cache_name, [previous_ring]) + + result = Cache.HashRing.get(cache_name, "repair:rpc_key", Cache.ETS, rpc_module: rpc_module) + + assert {:ok, "repaired_value"} === result + end + + test "migrates value to current node and schedules async delete on repair" do + cache_name = :test_hash_ring_cache + fake_node = :fake_migrate@node + encoded = Cache.TermEncoder.encode("migrate_me", nil) + calls_agent = :"repair_calls_#{:erlang.unique_integer([:positive])}" + {:ok, _} = Agent.start_link(fn -> [] end, name: calls_agent) + + rpc_module = build_rpc(fn node, _mod, func, _args -> + Agent.update(calls_agent, fn acc -> [{node, func} | acc] end) + + cond do + node === fake_node and func === :get -> {:ok, encoded} + func === :put -> :ok + func === :delete -> :ok + true -> {:ok, nil} + end + end) + + previous_ring = HashRing.add_node(HashRing.new(), fake_node) + inject_previous_rings(cache_name, [previous_ring]) + + Cache.HashRing.get(cache_name, "repair:migrate_key2", Cache.ETS, rpc_module: rpc_module) + + Process.sleep(100) + + calls = Agent.get(calls_agent, & &1) + assert Enum.any?(calls, fn {_node, func} -> func === :delete end) + end + end + + describe "Cache.HashRing.RingMonitor" do + test "starts with empty ring history" do + rings = Cache.HashRing.RingMonitor.previous_rings(:test_hash_ring_cache) + assert rings === [] + end + + test "previous_rings/1 returns empty list for unknown cache" do + rings = Cache.HashRing.RingMonitor.previous_rings(:nonexistent_cache) + assert rings === [] + end + + test "ring_history_size option limits stored snapshots" do + cache_name = :test_history_size_cache + + defmodule HistorySizeCache do + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :test_history_size_cache, + opts: [ring_history_size: 2] + end + + start_supervised!(%{ + id: :history_size_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[HistorySizeCache], [name: :history_size_cache_sup]]} + }) + + ring1 = HashRing.add_node(HashRing.new(), :node1@host) + ring2 = HashRing.add_node(HashRing.new(), :node2@host) + ring3 = HashRing.add_node(HashRing.new(), :node3@host) + + inject_previous_rings(cache_name, [ring3, ring2, ring1]) + + stored = Cache.HashRing.RingMonitor.previous_rings(cache_name) + assert length(stored) === 3 + + table = Cache.HashRing.RingMonitor.history_table_name(cache_name) + :ets.insert(table, {:previous_rings, [ring3, ring2, ring1]}) + stored = Cache.HashRing.RingMonitor.previous_rings(cache_name) + assert length(stored) === 3 + + :ets.insert(table, {:previous_rings, Enum.take([ring3, ring2, ring1], 2)}) + stored = Cache.HashRing.RingMonitor.previous_rings(cache_name) + assert length(stored) === 2 + end + end + + defp inject_previous_rings(cache_name, rings) do + table = Cache.HashRing.RingMonitor.history_table_name(cache_name) + :ets.insert(table, {:previous_rings, rings}) + end + + defp build_rpc(fun) do + key = {Cache.HashRingTest.StubRpc, self()} + :persistent_term.put(key, fun) + on_exit(fn -> :persistent_term.erase(key) end) + Cache.HashRingTest.StubRpc + end + + defmodule StubRpc do + def call(node, mod, func, args) do + pid = find_registered_pid() + + case pid && :persistent_term.get({Cache.HashRingTest.StubRpc, pid}, nil) do + nil -> {:ok, nil} + fun -> fun.(node, mod, func, args) + end + end + + defp find_registered_pid do + callers = Process.get(:"$callers", []) + ancestors = Process.get(:"$ancestors", []) + + [self() | callers ++ ancestors] + |> Enum.filter(&is_pid/1) + |> Enum.find(fn pid -> + :persistent_term.get({Cache.HashRingTest.StubRpc, pid}, nil) !== nil + end) + end + end +end diff --git a/test/cache/multi_layer_test.exs b/test/cache/multi_layer_test.exs new file mode 100644 index 0000000..b6bc470 --- /dev/null +++ b/test/cache/multi_layer_test.exs @@ -0,0 +1,159 @@ +defmodule Cache.MultiLayerTest do + use ExUnit.Case, async: true + + defmodule FastCache do + use Cache, + adapter: Cache.ETS, + name: :ml_fast_cache, + opts: [] + end + + defmodule SlowCache do + use Cache, + adapter: Cache.ETS, + name: :ml_slow_cache, + opts: [] + end + + defmodule TwoLayerCache do + use Cache, + adapter: {Cache.MultiLayer, [FastCache, SlowCache]}, + name: :two_layer_cache, + opts: [] + end + + defmodule FetchCache do + use Cache, + adapter: {Cache.MultiLayer, [FastCache, SlowCache]}, + name: :fetch_cache, + opts: [on_fetch: &__MODULE__.fetch/1] + + def fetch(key), do: {:ok, "fetched:#{key}"} + end + + defmodule BackfillTtlCache do + use Cache, + adapter: {Cache.MultiLayer, [FastCache, SlowCache]}, + name: :backfill_ttl_cache, + opts: [backfill_ttl: 5000] + end + + setup do + start_supervised!(%{ + id: :fast_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[FastCache, SlowCache], [name: :fast_slow_sup]]} + }) + + start_supervised!(%{ + id: :two_layer_sup, + type: :supervisor, + start: {Cache, :start_link, [[TwoLayerCache], [name: :two_layer_sup]]} + }) + + start_supervised!(%{ + id: :fetch_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[FetchCache], [name: :fetch_cache_sup]]} + }) + + start_supervised!(%{ + id: :backfill_ttl_sup, + type: :supervisor, + start: {Cache, :start_link, [[BackfillTtlCache], [name: :backfill_ttl_sup]]} + }) + + :ok + end + + describe "put/3 writes slowest to fastest" do + test "value is available in both layers after put" do + assert :ok === TwoLayerCache.put("write_key", "write_value") + + assert {:ok, "write_value"} === FastCache.get("write_key") + assert {:ok, "write_value"} === SlowCache.get("write_key") + end + + test "overwrites existing values in all layers" do + assert :ok === TwoLayerCache.put("overwrite_key", "first") + assert :ok === TwoLayerCache.put("overwrite_key", "second") + + assert {:ok, "second"} === FastCache.get("overwrite_key") + assert {:ok, "second"} === SlowCache.get("overwrite_key") + end + end + + describe "get/1 reads fastest to slowest" do + test "returns value from fast layer when present" do + FastCache.put("fast_only", "from_fast") + SlowCache.put("fast_only", "from_slow") + + assert {:ok, "from_fast"} === TwoLayerCache.get("fast_only") + end + + test "returns value from slow layer when fast layer misses" do + SlowCache.put("slow_only", "from_slow") + + assert {:ok, "from_slow"} === TwoLayerCache.get("slow_only") + end + + test "returns nil when all layers miss" do + assert {:ok, nil} === TwoLayerCache.get("totally_missing") + end + end + + describe "get/1 backfills faster layers on slow-layer hit" do + test "backfills fast layer when value found in slow layer" do + SlowCache.put("backfill_key", "backfill_value") + + assert {:ok, nil} === FastCache.get("backfill_key") + + assert {:ok, "backfill_value"} === TwoLayerCache.get("backfill_key") + + assert {:ok, "backfill_value"} === FastCache.get("backfill_key") + end + end + + describe "delete/1 removes from all layers" do + test "deletes from all layers" do + assert :ok === TwoLayerCache.put("delete_key", "to_delete") + + assert {:ok, "to_delete"} === TwoLayerCache.get("delete_key") + + assert :ok === TwoLayerCache.delete("delete_key") + + assert {:ok, nil} === TwoLayerCache.get("delete_key") + end + end + + describe "on_fetch callback" do + test "calls fetch callback on total miss" do + assert {:ok, "fetched:missing_key"} === FetchCache.get("missing_key") + end + + test "backfills all layers after fetch" do + FetchCache.get("fetch_backfill_key") + + assert {:ok, "fetched:fetch_backfill_key"} === FastCache.get("fetch_backfill_key") + assert {:ok, "fetched:fetch_backfill_key"} === SlowCache.get("fetch_backfill_key") + end + + test "does not call fetch when value exists" do + FastCache.put("existing_key", "existing_value") + + assert {:ok, "existing_value"} === FetchCache.get("existing_key") + end + end + + describe "cache_adapter/0" do + test "returns Cache.MultiLayer as adapter" do + assert TwoLayerCache.cache_adapter() === Cache.MultiLayer + end + end + + describe "Cache.Strategy.strategy?/1" do + test "recognises Cache.MultiLayer as a strategy" do + assert Cache.Strategy.strategy?(Cache.MultiLayer) === true + end + end +end diff --git a/test/cache/persistent_term_test.exs b/test/cache/persistent_term_test.exs new file mode 100644 index 0000000..d4e4926 --- /dev/null +++ b/test/cache/persistent_term_test.exs @@ -0,0 +1,60 @@ +defmodule Cache.PersistentTermTest do + use ExUnit.Case, async: true + + defmodule TestPersistentTermCache do + use Cache, + adapter: Cache.PersistentTerm, + name: :test_persistent_term_cache, + opts: [] + end + + setup do + start_supervised({Cache, [TestPersistentTermCache]}) + + on_exit(fn -> + :persistent_term.erase({:test_persistent_term_cache, :cleanup_key}) + end) + + :ok + end + + describe "put/3 and get/1" do + test "stores and retrieves a value" do + assert :ok === TestPersistentTermCache.put(:my_key, "hello") + assert {:ok, "hello"} === TestPersistentTermCache.get(:my_key) + end + + test "returns nil for unknown key" do + assert {:ok, nil} === TestPersistentTermCache.get(:nonexistent_key) + end + + test "overwrites an existing value" do + assert :ok === TestPersistentTermCache.put(:overwrite_key, "first") + assert :ok === TestPersistentTermCache.put(:overwrite_key, "second") + assert {:ok, "second"} === TestPersistentTermCache.get(:overwrite_key) + end + + test "stores complex terms" do + value = %{nested: [1, 2, 3], map: %{a: :b}} + assert :ok === TestPersistentTermCache.put(:complex_key, value) + assert {:ok, ^value} = TestPersistentTermCache.get(:complex_key) + end + + test "ttl argument is ignored" do + assert :ok === TestPersistentTermCache.put(:ttl_key, 1000, "still stored") + assert {:ok, "still stored"} === TestPersistentTermCache.get(:ttl_key) + end + end + + describe "delete/1" do + test "removes an existing key" do + assert :ok === TestPersistentTermCache.put(:delete_key, "value") + assert :ok === TestPersistentTermCache.delete(:delete_key) + assert {:ok, nil} === TestPersistentTermCache.get(:delete_key) + end + + test "is a no-op for non-existent key" do + assert :ok === TestPersistentTermCache.delete(:never_set_key) + end + end +end diff --git a/test/cache/refresh_ahead_test.exs b/test/cache/refresh_ahead_test.exs new file mode 100644 index 0000000..930a4cd --- /dev/null +++ b/test/cache/refresh_ahead_test.exs @@ -0,0 +1,179 @@ +defmodule Cache.RefreshAheadTest do + use ExUnit.Case, async: true + + defmodule TestRefreshCache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.ETS}, + name: :test_refresh_cache, + opts: [refresh_before: 500] + + def refresh(key), do: {:ok, "refreshed:#{key}"} + end + + defmodule CallbackRefreshCache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.ETS}, + name: :callback_refresh_cache, + opts: [ + refresh_before: 500, + on_refresh: &__MODULE__.custom_refresh/1 + ] + + def custom_refresh(key), do: {:ok, "custom:#{key}"} + end + + defmodule LockedRefreshCache do + use Cache, + adapter: {Cache.RefreshAhead, Cache.ETS}, + name: :locked_refresh_cache, + opts: [ + refresh_before: 500, + lock_node_whitelist: [node()] + ] + + def refresh(key), do: {:ok, "locked:#{key}"} + end + + setup do + start_supervised!(%{ + id: :refresh_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[TestRefreshCache], [name: :refresh_cache_sup]]} + }) + + start_supervised!(%{ + id: :callback_refresh_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[CallbackRefreshCache], [name: :callback_refresh_cache_sup]]} + }) + + start_supervised!(%{ + id: :locked_refresh_cache_sup, + type: :supervisor, + start: {Cache, :start_link, [[LockedRefreshCache], [name: :locked_refresh_cache_sup]]} + }) + + :ok + end + + describe "put/3 and get/1 - basic operations" do + test "stores and retrieves a value" do + assert :ok === TestRefreshCache.put("basic_key", 10_000, "hello") + assert {:ok, "hello"} === TestRefreshCache.get("basic_key") + end + + test "returns nil for missing keys" do + assert {:ok, nil} === TestRefreshCache.get("missing_key") + end + + test "stores complex values" do + assert :ok === TestRefreshCache.put("map_key", 10_000, %{a: 1, b: 2}) + assert {:ok, %{a: 1, b: 2}} === TestRefreshCache.get("map_key") + end + end + + describe "delete/1" do + test "removes a stored value" do + assert :ok === TestRefreshCache.put("delete_key", 10_000, "to_delete") + assert {:ok, "to_delete"} === TestRefreshCache.get("delete_key") + assert :ok === TestRefreshCache.delete("delete_key") + assert {:ok, nil} === TestRefreshCache.get("delete_key") + end + end + + describe "refresh-ahead behaviour" do + test "does not trigger refresh when far from TTL expiry" do + assert :ok === TestRefreshCache.put("no_refresh_key", 10_000, "original") + + assert {:ok, "original"} === TestRefreshCache.get("no_refresh_key") + + Process.sleep(50) + + assert {:ok, "original"} === TestRefreshCache.get("no_refresh_key") + end + + test "triggers async refresh when within refresh_before window" do + assert :ok === TestRefreshCache.put("refresh_key", 2000, "original") + + assert {:ok, "original"} === TestRefreshCache.get("refresh_key") + + Process.sleep(1600) + + assert {:ok, "original"} === TestRefreshCache.get("refresh_key") + + Process.sleep(200) + + assert {:ok, "refreshed:refresh_key"} === TestRefreshCache.get("refresh_key") + end + + test "on_refresh option overrides module callback" do + assert :ok === CallbackRefreshCache.put("cb_key", 2000, "original") + + assert {:ok, "original"} === CallbackRefreshCache.get("cb_key") + + Process.sleep(1600) + + assert {:ok, "original"} === CallbackRefreshCache.get("cb_key") + + Process.sleep(200) + + assert {:ok, "custom:cb_key"} === CallbackRefreshCache.get("cb_key") + end + end + + describe "deduplication" do + test "multiple concurrent gets only spawn one refresh task" do + assert :ok === TestRefreshCache.put("dedup_key", 2000, "original") + + Process.sleep(1600) + + tasks = + Enum.map(1..5, fn _ -> + Task.async(fn -> TestRefreshCache.get("dedup_key") end) + end) + + results = Task.await_many(tasks) + assert Enum.all?(results, fn {:ok, val} -> val === "original" end) + + Process.sleep(200) + + assert {:ok, "refreshed:dedup_key"} === TestRefreshCache.get("dedup_key") + end + + test "global lock prevents refresh while lock is held" do + lock_resource = {:refresh_ahead_lock, :locked_refresh_cache, "locked_key"} + lock_id = {lock_resource, self()} + lock_nodes = [Node.self()] + + assert true === :global.set_lock(lock_id, lock_nodes, 0) + assert :ok === LockedRefreshCache.put("locked_key", 2000, "original") + + Process.sleep(1600) + + assert {:ok, "original"} === LockedRefreshCache.get("locked_key") + + Process.sleep(250) + + assert {:ok, "original"} === LockedRefreshCache.get("locked_key") + assert true === :global.del_lock(lock_id, lock_nodes) + + assert {:ok, "original"} === LockedRefreshCache.get("locked_key") + + Process.sleep(250) + + assert {:ok, "locked:locked_key"} === LockedRefreshCache.get("locked_key") + end + end + + describe "cache_adapter/0" do + test "returns Cache.RefreshAhead as adapter" do + assert TestRefreshCache.cache_adapter() === Cache.RefreshAhead + end + end + + describe "Cache.Strategy.strategy?/1" do + test "recognises Cache.RefreshAhead as a strategy" do + assert Cache.Strategy.strategy?(Cache.RefreshAhead) === true + end + end +end diff --git a/test/cache_sandbox_test.exs b/test/cache_sandbox_test.exs index 627235c..5c45ebd 100644 --- a/test/cache_sandbox_test.exs +++ b/test/cache_sandbox_test.exs @@ -35,6 +35,10 @@ defmodule CacheSandboxTest do test "works to seperate caches between tests" do assert {:ok, nil} = TestCache.get(@cache_key) end + + test "adapter gets swapped to sandbox adapter" do + assert TestCache.cache_adapter() === Cache.Sandbox + end end describe "&json_get/1" do @@ -61,13 +65,13 @@ defmodule CacheSandboxTest do TestCache.json_get(key, [:some_array, 0]) end - test "returns :error tuple if path not found" do + test "returns :error tuple if path not found", %{key: key} do assert {:error, %ErrorMessage{ message: "ERR Path '$.c.d' does not exist", code: :not_found, details: nil - }} === TestCache.json_get(@cache_key, ["c.d"]) + }} === TestCache.json_get(key, ["c.d"]) end end diff --git a/test/cache_strategy_test.exs b/test/cache_strategy_test.exs new file mode 100644 index 0000000..1f78356 --- /dev/null +++ b/test/cache_strategy_test.exs @@ -0,0 +1,327 @@ +defmodule CacheStrategyTest do + use ExUnit.Case, async: true + + defmodule TestCache.RefreshAheadETS do + use Cache, + adapter: {Cache.RefreshAhead, Cache.ETS}, + name: :test_strategy_refresh_ahead_ets, + opts: [refresh_before: 500] + + def refresh(key), do: {:ok, "refreshed:#{key}"} + end + + defmodule TestCache.HashRingETS do + use Cache, + adapter: {Cache.HashRing, Cache.ETS}, + name: :test_strategy_hash_ring_ets, + opts: [] + end + + defmodule TestCache.MultiLayerETS do + use Cache, + adapter: {Cache.MultiLayer, [Cache.ETS, Cache.Agent]}, + name: :test_strategy_multi_layer_ets, + opts: [] + end + + defmodule TestCache.Layer1 do + use Cache, + adapter: Cache.ETS, + name: :test_multi_layer_layer1, + opts: [] + end + + defmodule TestCache.Layer2 do + use Cache, + adapter: Cache.Agent, + name: :test_multi_layer_layer2, + opts: [] + end + + defmodule TestCache.MultiLayerModules do + use Cache, + adapter: {Cache.MultiLayer, [TestCache.Layer1, TestCache.Layer2]}, + name: :test_strategy_multi_layer_modules, + opts: [] + end + + defmodule TestCache.MultiLayerFetch do + use Cache, + adapter: {Cache.MultiLayer, [Cache.ETS]}, + name: :test_strategy_multi_layer_fetch, + opts: [on_fetch: &__MODULE__.fetch/1] + + def fetch(key), do: {:ok, "fetched:#{key}"} + end + + @strategy_adapters [ + TestCache.RefreshAheadETS, + TestCache.HashRingETS, + TestCache.MultiLayerModules + ] + + for adapter <- @strategy_adapters do + describe "#{adapter} &get/1 & &put/2 & &delete/1" do + setup do + start_supervised!(%{ + id: :"#{unquote(adapter)}_sup", + type: :supervisor, + start: + {Cache, :start_link, + [ + [TestCache.Layer1, TestCache.Layer2, unquote(adapter)], + [name: :"#{unquote(adapter)}_sup"] + ]} + }) + + Process.sleep(50) + + :ok + end + + test "puts into the cache and can get it back after" do + test_key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + value = %{some_value: Faker.App.name()} + cache_module = unquote(adapter) + + assert {:ok, nil} = cache_module.get(test_key) + assert :ok = cache_module.put(test_key, value) + + Process.sleep(50) + + assert {:ok, value} === cache_module.get(test_key) + end + + test "deleting from cache works" do + test_key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + value = %{some_value: Faker.App.name()} + cache_module = unquote(adapter) + + assert :ok = cache_module.put(test_key, value) + + Process.sleep(50) + + assert :ok = cache_module.delete(test_key) + + Process.sleep(50) + + assert {:ok, nil} = cache_module.get(test_key) + end + + test "puts into the cache with nil acts like deleting" do + test_key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + value = %{some_value: Faker.App.name()} + cache_module = unquote(adapter) + + assert {:ok, nil} = cache_module.get(test_key) + assert :ok = cache_module.put(test_key, value) + + Process.sleep(50) + + assert {:ok, value} === cache_module.get(test_key) + assert :ok = cache_module.put(test_key, nil) + + Process.sleep(50) + + assert {:ok, nil} = cache_module.get(test_key) + end + end + + describe "#{adapter} &get_or_create/2" do + setup do + start_supervised!(%{ + id: :"#{unquote(adapter)}_get_or_create_sup", + type: :supervisor, + start: + {Cache, :start_link, + [ + [TestCache.Layer1, TestCache.Layer2, unquote(adapter)], + [name: :"#{unquote(adapter)}_get_or_create_sup"] + ]} + }) + + Process.sleep(50) + + :ok + end + + test "finds an item in the cache that already exists" do + test_key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + value = %{some_value: Faker.App.name()} + cache_module = unquote(adapter) + + assert :ok = cache_module.put(test_key, value) + + Process.sleep(50) + + assert {:ok, value} === + cache_module.get_or_create(test_key, fn -> + raise "I shouldn't be called" + end) + + assert {:ok, value} === cache_module.get(test_key) + end + + test "creates a value for key when key doesn't exist in cache" do + test_key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + value = %{some_value: Faker.App.name()} + cache_module = unquote(adapter) + + assert {:ok, nil} = cache_module.get(test_key) + + assert {:ok, value} === + cache_module.get_or_create(test_key, fn -> + {:ok, value} + end) + + Process.sleep(50) + + assert {:ok, value} === cache_module.get(test_key) + end + end + end + + describe "&cache_adapter/0" do + test "returns the strategy module for RefreshAhead" do + assert TestCache.RefreshAheadETS.cache_adapter() === Cache.RefreshAhead + end + + test "returns the strategy module for HashRing" do + assert TestCache.HashRingETS.cache_adapter() === Cache.HashRing + end + + test "returns the strategy module for MultiLayer" do + assert TestCache.MultiLayerETS.cache_adapter() === Cache.MultiLayer + end + end + + describe "Cache.Strategy.strategy?/1" do + test "returns true for Cache.RefreshAhead" do + assert Cache.Strategy.strategy?(Cache.RefreshAhead) === true + end + + test "returns true for Cache.HashRing" do + assert Cache.Strategy.strategy?(Cache.HashRing) === true + end + + test "returns true for Cache.MultiLayer" do + assert Cache.Strategy.strategy?(Cache.MultiLayer) === true + end + + test "returns false for plain adapters" do + refute Cache.Strategy.strategy?(Cache.ETS) + refute Cache.Strategy.strategy?(Cache.Agent) + refute Cache.Strategy.strategy?(Cache.Redis) + end + end + + describe "Cache.MultiLayer layered read behaviour" do + setup do + start_supervised!(%{ + id: :multi_layer_modules_sup, + type: :supervisor, + start: + {Cache, :start_link, + [ + [TestCache.Layer1, TestCache.Layer2, TestCache.MultiLayerModules], + [name: :multi_layer_modules_sup] + ]} + }) + + Process.sleep(50) + + :ok + end + + test "reads from layer1 first when value is present" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + TestCache.Layer1.put(key, "from_layer1") + + assert {:ok, "from_layer1"} === TestCache.MultiLayerModules.get(key) + end + + test "falls through to layer2 when layer1 misses" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + TestCache.Layer2.put(key, "from_layer2") + + assert {:ok, "from_layer2"} === TestCache.MultiLayerModules.get(key) + end + + test "backfills layer1 after a hit in layer2" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + TestCache.Layer2.put(key, "from_layer2") + + assert {:ok, "from_layer2"} === TestCache.MultiLayerModules.get(key) + + assert {:ok, "from_layer2"} === TestCache.Layer1.get(key) + end + + test "returns nil when all layers miss" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + + assert {:ok, nil} === TestCache.MultiLayerModules.get(key) + end + end + + describe "Cache.MultiLayer write behaviour" do + setup do + start_supervised!(%{ + id: :multi_layer_write_sup, + type: :supervisor, + start: + {Cache, :start_link, + [ + [TestCache.Layer1, TestCache.Layer2, TestCache.MultiLayerModules], + [name: :multi_layer_write_sup] + ]} + }) + + Process.sleep(50) + + :ok + end + + test "put writes to all layers" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + + assert :ok = TestCache.MultiLayerModules.put(key, "value") + + assert {:ok, "value"} === TestCache.Layer1.get(key) + assert {:ok, "value"} === TestCache.Layer2.get(key) + end + + test "delete removes from all layers" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + + TestCache.MultiLayerModules.put(key, "value") + assert :ok = TestCache.MultiLayerModules.delete(key) + + assert {:ok, nil} === TestCache.Layer1.get(key) + assert {:ok, nil} === TestCache.Layer2.get(key) + end + end + + describe "Cache.MultiLayer on_fetch callback" do + setup do + start_supervised!(%{ + id: :multi_layer_fetch_sup, + type: :supervisor, + start: + {Cache, :start_link, + [[TestCache.MultiLayerFetch], [name: :multi_layer_fetch_sup]]} + }) + + Process.sleep(50) + + :ok + end + + test "invokes fetch callback on total miss and backfills layers" do + key = "#{Faker.Pokemon.name()}_#{Enum.random(1..100_000_000_000)}" + + assert {:ok, "fetched:#{key}"} === TestCache.MultiLayerFetch.get(key) + + assert {:ok, "fetched:#{key}"} === TestCache.MultiLayerFetch.get(key) + end + end +end