Skip to content

Commit 1cb256a

Browse files
committed
refactor: bus
1 parent 393f316 commit 1cb256a

21 files changed

+100
-767
lines changed

packages/bentocache/benchmarks/encoders.ts

-52
This file was deleted.

packages/bentocache/factories/cache_factory.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import lodash from '@poppinss/utils/lodash'
22
import { getActiveTest } from '@japa/runner'
3+
import { MemoryTransport } from '@rlanz/bus/drivers/memory'
34

45
import { Cache } from '../src/cache/cache.js'
56
import { RedisDriver } from '../src/drivers/redis.js'
67
import { MemoryDriver } from '../src/drivers/memory.js'
7-
import { MemoryBus } from '../src/bus/drivers/memory_bus.js'
88
import type { CacheStackDrivers } from '../src/types/main.js'
99
import { CacheStack } from '../src/cache/stack/cache_stack.js'
1010
import { BentoCacheOptions } from '../src/bento_cache_options.js'
@@ -80,7 +80,7 @@ export class CacheFactory {
8080
withL1L2Config() {
8181
this.#parameters.l1Driver ??= new MemoryDriver({ maxSize: 100, prefix: 'test' })
8282
this.#parameters.l2Driver ??= new RedisDriver({ connection: { host: '127.0.0.1', port: 6379 } })
83-
this.#parameters.busDriver ??= new MemoryBus()
83+
this.#parameters.busDriver ??= new MemoryTransport()
8484

8585
return this
8686
}

packages/bentocache/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"dependencies": {
7070
"@paralleldrive/cuid2": "^2.2.2",
7171
"@poppinss/utils": "^6.7.3",
72+
"@rlanz/bus": "^0.2.0",
7273
"async-mutex": "^0.5.0",
7374
"chunkify": "^5.0.0",
7475
"hexoid": "^1.0.0",

packages/bentocache/src/bus/bus.ts

+17-92
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import { Bus as RlanzBus } from '@rlanz/bus'
12
import { createId } from '@paralleldrive/cuid2'
3+
import type { Transport } from '@rlanz/bus/types/main'
24

3-
import { RetryQueue } from './retry_queue.js'
45
import { CacheBusMessageType } from '../types/bus.js'
56
import type { LocalCache } from '../cache/facades/local_cache.js'
67
import { BusMessageReceived } from '../events/bus/bus_message_received.js'
78
import { BusMessagePublished } from '../events/bus/bus_message_published.js'
8-
import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'
9+
import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'
910

1011
/**
1112
* The bus is used to notify other processes about cache changes.
@@ -17,94 +18,51 @@ import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '..
1718
* local cache accordingly.
1819
*/
1920
export class Bus {
20-
/**
21-
* The underlying bus driver
22-
*/
23-
#driver: BusDriver
24-
2521
/**
2622
* The local cache that will be updated when a message is received
2723
*/
2824
#cache?: LocalCache
2925

30-
/**
31-
* The logger to use
32-
*/
33-
#logger: Logger
34-
35-
/**
36-
* Emitter
37-
*/
38-
#emitter: Emitter
39-
4026
/**
4127
* A unique identifier for this bus instance
4228
* that is used to prevent the bus from
4329
* emitting events to itself
4430
*/
4531
#busId = createId()
46-
47-
/**
48-
* The channel name to use
49-
*/
32+
#bus: RlanzBus
33+
#logger: Logger
34+
#emitter: Emitter
5035
#channelName = 'bentocache.notifications'
5136

52-
/**
53-
* The error retry queue holds messages that failed to be sent
54-
*/
55-
#errorRetryQueue = new RetryQueue()
56-
5737
constructor(
58-
driver: BusDriver,
38+
driver: Transport,
5939
cache: LocalCache,
6040
logger: Logger,
6141
emitter: Emitter,
6242
options: BusOptions = {},
6343
) {
64-
this.#driver = driver
6544
this.#cache = cache
6645
this.#emitter = emitter
6746
this.#logger = logger.child({ context: 'bentocache.bus' })
68-
this.#errorRetryQueue = new RetryQueue(options.retryQueue?.enabled, options.retryQueue?.maxSize)
6947

70-
driver
71-
.setId(this.#busId)
72-
.setLogger(this.#logger)
73-
.onReconnect(() => this.#onReconnect())
74-
}
48+
this.#bus = new RlanzBus(driver, {
49+
retryQueue: { ...options.retryQueue, removeDuplicates: true, retryInterval: false },
50+
})
7551

76-
/**
77-
* Process the error retry queue
78-
*/
79-
async #processErrorRetryQueue() {
80-
this.#logger.debug(
81-
`starting error retry queue processing with ${this.#errorRetryQueue.size()} messages`,
52+
this.#bus.subscribe<CacheBusMessage & { [key: string]: any }>(
53+
this.#channelName,
54+
this.#onMessage.bind(this),
8255
)
83-
84-
await this.#errorRetryQueue.process(async (message) => {
85-
await this.publish(message)
86-
return true
87-
})
8856
}
8957

9058
/**
9159
* When a message is received through the bus.
9260
* This is where we update the local cache.
9361
*/
9462
async #onMessage(message: CacheBusMessage) {
95-
/**
96-
* Since we received a message from the bus, we assume that
97-
* the Bus is working. So we can try process the error retry queue if
98-
* there are any messages in it.
99-
*/
100-
await this.#processErrorRetryQueue()
101-
10263
this.#logger.trace({ keys: message.keys, type: message.type }, 'received message from bus')
10364
this.#emitter.emit('bus:message:received', new BusMessageReceived(message))
10465

105-
/**
106-
* Process the message
107-
*/
10866
if (message.type === CacheBusMessageType.Delete) {
10967
for (const key of message.keys) this.#cache?.delete(key)
11068
}
@@ -118,52 +76,19 @@ export class Bus {
11876
}
11977
}
12078

121-
/**
122-
* When the bus driver reconnects after a disconnection
123-
*/
124-
async #onReconnect() {
125-
this.#logger.debug('bus driver reconnected')
126-
await this.#processErrorRetryQueue()
127-
}
128-
129-
/**
130-
* Subscribe to the bus channel
131-
*/
132-
async subscribe() {
133-
this.#driver.subscribe(this.#channelName, this.#onMessage.bind(this))
134-
}
135-
13679
/**
13780
* Publish a message to the bus channel
13881
*
13982
* @returns true if the message was published, false if not
14083
*/
14184
async publish(message: Omit<CacheBusMessage, 'busId'>): Promise<boolean> {
142-
const fullMessage = { ...message, busId: this.#busId }
143-
14485
try {
145-
this.#logger.trace({ keys: message.keys, type: message.type }, 'publishing message to bus')
146-
147-
/**
148-
* Publish the message to the bus using the underlying driver
149-
*/
150-
await this.#driver.publish(this.#channelName, fullMessage)
151-
152-
/**
153-
* Emit the bus:message:published event
154-
*/
86+
const fullMessage = { ...message, busId: this.#busId }
87+
await this.#bus.publish(this.#channelName, message)
15588
this.#emitter.emit('bus:message:published', new BusMessagePublished(fullMessage))
15689
return true
15790
} catch (error) {
158-
this.#logger.error(error, 'failed to publish message to bus')
159-
160-
/**
161-
* Add to the error retry queue
162-
*/
163-
const wasAdded = this.#errorRetryQueue.enqueue(fullMessage)
164-
if (!wasAdded) return false
165-
166-
this.#logger.debug(message, 'added message to error retry queue')
91+
this.#logger.error({ error }, 'failed to publish message to bus')
16792
return false
16893
}
16994
}
@@ -172,6 +97,6 @@ export class Bus {
17297
* Disconnect the bus
17398
*/
17499
async disconnect(): Promise<void> {
175-
this.#driver.disconnect()
100+
await this.#bus.disconnect()
176101
}
177102
}

packages/bentocache/src/bus/drivers/memory_bus.ts

-92
This file was deleted.

0 commit comments

Comments
 (0)