Skip to content

Commit 4d1feb5

Browse files
committed
feat: add circuit breaker system for l2 cache
1 parent c443118 commit 4d1feb5

File tree

9 files changed

+205
-23
lines changed

9 files changed

+205
-23
lines changed

.changeset/good-candles-trade.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'bentocache': minor
3+
---
4+
5+
Added a super simple circuit breaker system to the L2 Cache :
6+
- a `l2CircuitBreakerDuration` parameter to set the duration of the circuit breaker. How many seconds the circuit breaker will stay open.
7+
- If defined, the circuit breaker will open when a call to our distributed cache fails. It will stay open for `l2CircuitBreakerDuration` seconds.
8+
9+
We may introduce more sophisticated circuit breaker system in the future, but for now, this simple system should be enough.

packages/bentocache/factories/cache_factory.ts

+1-10
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,7 @@ export class CacheFactory {
3232
*/
3333
#createCacheStack() {
3434
const options = new BentoCacheOptions({
35-
ttl: this.#parameters.ttl,
36-
grace: this.#parameters.grace,
37-
graceBackoff: this.#parameters.graceBackoff,
38-
timeout: this.#parameters.timeout,
39-
hardTimeout: this.#parameters.hardTimeout,
40-
logger: this.#parameters.logger,
41-
emitter: this.#parameters.emitter,
42-
lockTimeout: this.#parameters.lockTimeout,
43-
serializer: this.#parameters.serializer,
44-
onFactoryError: this.#parameters.onFactoryError,
35+
...this.#parameters,
4536
}).serializeL1Cache(this.#l1Options.serialize ?? true)
4637

4738
const stack = new CacheStack('primary', options, {

packages/bentocache/src/bento_cache_options.ts

+8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import lodash from '@poppinss/utils/lodash'
33
import string from '@poppinss/utils/string'
44
import { noopLogger } from '@julr/utils/logger'
55

6+
import { resolveTtl } from './helpers.js'
67
import type { FactoryError } from './errors.js'
78
import { JsonSerializer } from './serializers/json.js'
89
import type {
@@ -73,6 +74,12 @@ export class BentoCacheOptions {
7374
*/
7475
lockTimeout?: Duration = null
7576

77+
/**
78+
* Duration for the circuit breaker to stay open
79+
* if l2 cache fails
80+
*/
81+
l2CircuitBreakerDuration: number | undefined
82+
7683
/**
7784
* If the L1 cache should be serialized
7885
*/
@@ -93,6 +100,7 @@ export class BentoCacheOptions {
93100

94101
this.emitter = this.#options.emitter!
95102
this.serializer = this.#options.serializer ?? defaultSerializer
103+
this.l2CircuitBreakerDuration = resolveTtl(this.#options.l2CircuitBreakerDuration, null)
96104

97105
this.logger = this.#options.logger!.child({ pkg: 'bentocache' })
98106
this.onFactoryError = this.#options.onFactoryError

packages/bentocache/src/cache/cache_stack.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class CacheStack extends BaseDriver {
4545
this.options.serializeL1 ? this.options.serializer : undefined,
4646
)
4747
if (drivers.l2Driver)
48-
this.l2 = new RemoteCache(drivers.l2Driver, this.logger, this.options.serializer, !!this.l1)
48+
this.l2 = new RemoteCache(drivers.l2Driver, this.logger, !!this.l1, this.options)
4949

5050
this.bus = bus ? bus : this.#createBus(drivers.busDriver, drivers.busOptions)
5151
if (this.l1) this.bus?.manageCache(this.prefix, this.l1)

packages/bentocache/src/cache/facades/remote_cache.ts

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { is } from '@julr/utils/is'
22

33
import { CacheEntry } from '../cache_entry/cache_entry.js'
4+
import { CircuitBreaker } from '../../circuit_breaker/index.js'
5+
import type { L2CacheDriver, Logger } from '../../types/main.js'
6+
import type { BentoCacheOptions } from '../../bento_cache_options.js'
47
import type { CacheEntryOptions } from '../cache_entry/cache_entry_options.js'
5-
import type { CacheSerializer, L2CacheDriver, Logger } from '../../types/main.js'
68

79
/**
810
* RemoteCache is a wrapper around a L2 Cache Driver that provides
@@ -11,18 +13,23 @@ import type { CacheSerializer, L2CacheDriver, Logger } from '../../types/main.js
1113
export class RemoteCache {
1214
#driver: L2CacheDriver
1315
#logger: Logger
14-
#serializer: CacheSerializer
1516
#hasL1Backup: boolean
17+
#circuitBreaker?: CircuitBreaker
18+
#options: BentoCacheOptions
1619

1720
constructor(
1821
driver: L2CacheDriver,
1922
logger: Logger,
20-
serializer: CacheSerializer,
2123
hasL1Backup: boolean,
24+
options: BentoCacheOptions,
2225
) {
23-
this.#hasL1Backup = hasL1Backup
2426
this.#driver = driver
25-
this.#serializer = serializer
27+
this.#options = options
28+
this.#hasL1Backup = hasL1Backup
29+
this.#circuitBreaker = options.l2CircuitBreakerDuration
30+
? new CircuitBreaker({ breakDuration: options.l2CircuitBreakerDuration })
31+
: undefined
32+
2633
this.#logger = logger.child({ context: 'bentocache.remoteCache' })
2734
}
2835

@@ -36,11 +43,18 @@ export class RemoteCache {
3643
fallbackValue: unknown,
3744
fn: () => any,
3845
) {
46+
if (this.#circuitBreaker?.isOpen()) {
47+
this.#logger.error({ opId: options.id }, `circuit breaker is open. ignoring operation`)
48+
return fallbackValue
49+
}
50+
3951
try {
4052
return await fn()
4153
} catch (error) {
4254
this.#logger.error({ error, opId: options.id }, `(${operation}) failed on remote cache`)
4355

56+
this.#circuitBreaker?.open()
57+
4458
/**
4559
* SuppressL2Errors is enabled automatically if undefined and we have a L1 backup
4660
* Otherwise, we need to check what the user set
@@ -64,7 +78,7 @@ export class RemoteCache {
6478
const value = await this.#driver.get(key)
6579
if (value === undefined) return
6680

67-
return CacheEntry.fromDriver(key, value, this.#serializer)
81+
return CacheEntry.fromDriver(key, value, this.#options.serializer)
6882
})
6983
}
7084

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { InvalidArgumentsException } from '@poppinss/utils/exceptions'
2+
3+
const CircuitBreakerState = { Closed: 0, Open: 1 }
4+
5+
interface CircuitBreakerOptions {
6+
/**
7+
* In milliseconds, how long the circuit breaker should stay open
8+
*/
9+
breakDuration: number | undefined
10+
}
11+
12+
/**
13+
* Super simple circuit breaker implementation
14+
*/
15+
export class CircuitBreaker {
16+
#state = CircuitBreakerState.Closed
17+
#willCloseAt: number | null = null
18+
#breakDuration: number
19+
20+
constructor(options: CircuitBreakerOptions) {
21+
this.#breakDuration = options.breakDuration ?? 0
22+
if (this.#breakDuration < 0) {
23+
throw new InvalidArgumentsException('breakDuration must be a positive number')
24+
}
25+
26+
this.#state = CircuitBreakerState.Closed
27+
}
28+
29+
/**
30+
* Check if the circuit breaker should change state
31+
*/
32+
#checkState() {
33+
if (this.#willCloseAt && this.#willCloseAt < Date.now()) this.close()
34+
}
35+
36+
/**
37+
* Check if the circuit breaker is open
38+
*/
39+
isOpen() {
40+
this.#checkState()
41+
return this.#state === CircuitBreakerState.Open
42+
}
43+
44+
/**
45+
* Check if the circuit breaker is closed
46+
*/
47+
isClosed() {
48+
this.#checkState()
49+
return this.#state === CircuitBreakerState.Closed
50+
}
51+
52+
/**
53+
* Open the circuit breaker
54+
*/
55+
open() {
56+
if (this.#state === CircuitBreakerState.Open) return
57+
58+
this.#state = CircuitBreakerState.Open
59+
this.#willCloseAt = Date.now() + this.#breakDuration
60+
}
61+
62+
/**
63+
* Close the circuit breaker
64+
*/
65+
close() {
66+
this.#state = CircuitBreakerState.Closed
67+
this.#willCloseAt = null
68+
}
69+
}

packages/bentocache/src/types/options/options.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ export type RawCommonOptions = {
5656
* throws an error
5757
*/
5858
onFactoryError?: (error: FactoryError) => void
59+
60+
/**
61+
* Duration for the circuit breaker to stay open
62+
* if l2 cache fails
63+
*
64+
* @default null Means, no circuit breaker
65+
*/
66+
l2CircuitBreakerDuration?: Duration
5967
}
6068

6169
/**
@@ -82,7 +90,7 @@ export type RawBentoCacheOptions = {
8290
emitter?: Emitter
8391

8492
/**
85-
* Custom serialiser
93+
* Custom serializer
8694
*/
8795
serializer?: CacheSerializer
8896
} & RawCommonOptions

packages/bentocache/tests/cache/remote_cache.spec.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import { testLogger } from '@julr/utils/logger'
44
import { REDIS_CREDENTIALS } from '../helpers/index.js'
55
import { RedisDriver } from '../../src/drivers/redis.js'
66
import { ChaosCache } from '../helpers/chaos/chaos_cache.js'
7-
import { JsonSerializer } from '../../src/serializers/json.js'
7+
import { BentoCacheOptions } from '../../src/bento_cache_options.js'
88
import { RemoteCache } from '../../src/cache/facades/remote_cache.js'
99
import { createCacheEntryOptions } from '../../src/cache/cache_entry/cache_entry_options.js'
1010

1111
test.group('Remote Cache', () => {
1212
test('should rethrows errors if suppressL2Errors is disabled', async ({ assert, cleanup }) => {
1313
const logger = testLogger()
1414
const chaosCacheDriver = new ChaosCache(new RedisDriver({ connection: REDIS_CREDENTIALS }))
15-
const cache = new RemoteCache(chaosCacheDriver, logger, new JsonSerializer(), true)
15+
const cache = new RemoteCache(chaosCacheDriver, logger, true, new BentoCacheOptions({}))
1616

1717
cleanup(() => chaosCacheDriver.disconnect())
1818

@@ -32,7 +32,7 @@ test.group('Remote Cache', () => {
3232
test('should ignore errors if suppressL2Errors is enabled', async ({ assert, cleanup }) => {
3333
const logger = testLogger()
3434
const chaosCacheDriver = new ChaosCache(new RedisDriver({ connection: REDIS_CREDENTIALS }))
35-
const cache = new RemoteCache(chaosCacheDriver, logger, new JsonSerializer(), true)
35+
const cache = new RemoteCache(chaosCacheDriver, logger, true, new BentoCacheOptions({}))
3636

3737
cleanup(() => chaosCacheDriver.disconnect())
3838

@@ -55,7 +55,7 @@ test.group('Remote Cache', () => {
5555
}) => {
5656
const logger = testLogger()
5757
const chaosCacheDriver = new ChaosCache(new RedisDriver({ connection: REDIS_CREDENTIALS }))
58-
const cache = new RemoteCache(chaosCacheDriver, logger, new JsonSerializer(), false)
58+
const cache = new RemoteCache(chaosCacheDriver, logger, false, new BentoCacheOptions({}))
5959

6060
cleanup(() => chaosCacheDriver.disconnect())
6161

@@ -78,7 +78,7 @@ test.group('Remote Cache', () => {
7878
}) => {
7979
const logger = testLogger()
8080
const chaosCacheDriver = new ChaosCache(new RedisDriver({ connection: REDIS_CREDENTIALS }))
81-
const cache = new RemoteCache(chaosCacheDriver, logger, new JsonSerializer(), false)
81+
const cache = new RemoteCache(chaosCacheDriver, logger, false, new BentoCacheOptions({}))
8282

8383
cleanup(() => chaosCacheDriver.disconnect())
8484

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { mock } from 'node:test'
2+
import { test } from '@japa/runner'
3+
import { sleep } from '@julr/utils/misc'
4+
5+
import { NullDriver } from './helpers/null/null_driver.js'
6+
import { CacheFactory } from '../factories/cache_factory.js'
7+
import { CircuitBreaker } from '../src/circuit_breaker/index.js'
8+
9+
test.group('Circuit breaking', () => {
10+
test('Simple circuit breaker should works', async ({ assert }) => {
11+
const cb = new CircuitBreaker({ breakDuration: 400 })
12+
13+
cb.open()
14+
cb.open()
15+
cb.open()
16+
17+
assert.isTrue(cb.isOpen())
18+
await sleep(200)
19+
assert.isTrue(cb.isOpen())
20+
await sleep(200)
21+
assert.isFalse(cb.isOpen())
22+
assert.isTrue(cb.isClosed())
23+
})
24+
25+
test('Circuit breaker should open when remote cache call fail', async ({ assert }) => {
26+
const get = mock.fn(() => {
27+
throw new Error('Unable to connect to remote cache')
28+
})
29+
30+
class L2Driver extends NullDriver {
31+
type = 'l2' as const
32+
33+
get() {
34+
return get()
35+
}
36+
}
37+
38+
const { cache } = new CacheFactory()
39+
.withMemoryL1()
40+
.merge({ l2Driver: new L2Driver({}), l2CircuitBreakerDuration: '200ms' })
41+
.create()
42+
43+
await cache.get({ key: 'foo' })
44+
await cache.get({ key: 'foo' })
45+
await cache.get({ key: 'foo' })
46+
47+
assert.deepEqual(get.mock.callCount(), 1)
48+
49+
// Wait for the circuit breaker to close
50+
await sleep(200)
51+
52+
await cache.get({ key: 'foo' })
53+
54+
assert.deepEqual(get.mock.callCount(), 2)
55+
}).disableTimeout()
56+
57+
test('should not have circuit breaker if l2CircuitBreakerDuration is not set', async ({
58+
assert,
59+
}) => {
60+
const get = mock.fn(() => {
61+
throw new Error('Unable to connect to remote cache')
62+
})
63+
64+
class L2Driver extends NullDriver {
65+
type = 'l2' as const
66+
67+
get() {
68+
return get()
69+
}
70+
}
71+
72+
const { cache } = new CacheFactory()
73+
.withMemoryL1()
74+
.merge({ l2Driver: new L2Driver({}) })
75+
.create()
76+
77+
await cache.get({ key: 'foo' })
78+
await cache.get({ key: 'foo' })
79+
await cache.get({ key: 'foo' })
80+
81+
assert.deepEqual(get.mock.callCount(), 3)
82+
})
83+
})

0 commit comments

Comments
 (0)