Skip to content

feat: add enableAwareness option #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ const decodeRedisRoomStreamName = (rediskey, expectedPrefix) => {

/**
* @param {import('./storage.js').AbstractStorage} store
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
*/
export const createApiClient = async (store, { redisPrefix, redisUrl }) => {
const a = new Api(store, redisPrefix, redisUrl)
export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwareness = true }) => {
const a = new Api(store, redisPrefix, redisUrl, { enableAwareness })
await a.redis.connect()
try {
await a.redis.xGroupCreate(a.redisWorkerStreamName, a.redisWorkerGroupName, '0', { MKSTREAM: true })
Expand All @@ -100,10 +100,13 @@ export class Api {
* @param {import('./storage.js').AbstractStorage} store
* @param {string=} prefix
* @param {string=} url
* @param {Object} opts
* @param {boolean=} opts.enableAwareness
*/
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis')) {
constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis'), { enableAwareness = true } = {}) {
this.store = store
this.prefix = prefix
this.enableAwareness = enableAwareness
this.consumername = random.uuidv4()
/**
* After this timeout, a new worker will pick up the task
Expand Down Expand Up @@ -240,8 +243,11 @@ export class Api {
if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`)
const docstate = await this.store.retrieveDoc(room, docid)
const ydoc = new Y.Doc()
const awareness = new awarenessProtocol.Awareness(ydoc)
awareness.setLocalState(null) // we don't want to propagate awareness state
let awareness = null
if (this.enableAwareness) {
awareness = new awarenessProtocol.Awareness(ydoc)
awareness.setLocalState(null) // we don't want to propagate awareness state
}
const now = performance.now()
if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) }
let changed = false
Expand All @@ -257,7 +263,9 @@ export class Api {
break
}
case 1: { // awareness message
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
if (this.enableAwareness && awareness) {
awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null)
}
break
}
}
Expand Down Expand Up @@ -394,7 +402,7 @@ export class Api {

/**
* @param {import('./storage.js').AbstractStorage} store
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
*/
export const createWorker = async (store, opts) => {
const a = await createApiClient(store, opts)
Expand Down
11 changes: 9 additions & 2 deletions src/socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ class YSocketIOServer {
* @param {string} [conf.redisUrl]
* @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate
* @param {import('worker_threads').Worker=} [conf.persistWorker]
* @param {boolean} [conf.enableAwareness]
*/
export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => {
const app = new YSocketIO(io, { authenticate })
export const registerYSocketIOServer = async (io, store, {
authenticate,
redisUrl,
redisPrefix,
persistWorker,
enableAwareness = true
}) => {
const app = new YSocketIO(io, { authenticate, enableAwareness })
const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker })
return new YSocketIOServer(app, client, subscriber)
}
2 changes: 1 addition & 1 deletion src/subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const run = async subscriber => {

/**
* @param {import('./storage.js').AbstractStorage} store
* @param {{ redisPrefix?: string, redisUrl?: string }} opts
* @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts
*/
export const createSubscriber = async (store, opts) => {
const client = await api.createApiClient(store, opts)
Expand Down
155 changes: 94 additions & 61 deletions src/y-socket-io/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import { io } from 'socket.io-client'
* @prop {boolean=} autoConnect
* (Optional) This boolean specify if the provider should connect when the instance is created, by default is true
*
* @prop {boolean=} enableAwareness
* (Optional) This boolean enable the awareness functionality, by default is true
*
* @prop {AwarenessProtocol.Awareness=} awareness
* (Optional) An existent awareness, by default is a new AwarenessProtocol.Awareness instance
*
Expand Down Expand Up @@ -73,9 +76,14 @@ export class SocketIOProvider extends Observable {
* @public
*/
doc
/**
* Enable awareness
* @type {boolean}
*/
enableAwareness
/**
* The awareness
* @type {AwarenessProtocol.Awareness}
* @type {AwarenessProtocol.Awareness=}
* @public
*/
awareness
Expand Down Expand Up @@ -126,7 +134,8 @@ export class SocketIOProvider extends Observable {
doc = new Y.Doc(),
{
autoConnect = true,
awareness = new AwarenessProtocol.Awareness(doc),
enableAwareness = true,
awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined,
resyncInterval = -1,
disableBc = false,
auth = {}
Expand All @@ -140,6 +149,8 @@ export class SocketIOProvider extends Observable {
this._url = url
this.roomName = roomName
this.doc = doc

this.enableAwareness = enableAwareness
this.awareness = awareness

this._broadcastChannel = `${url}/${roomName}`
Expand Down Expand Up @@ -167,12 +178,13 @@ export class SocketIOProvider extends Observable {

this.initSyncListeners()

this.initAwarenessListeners()
if (this.enableAwareness) {
this.initAwarenessListeners()
awareness?.on('update', this.awarenessUpdate)
}

this.initSystemListeners()

awareness.on('update', this.awarenessUpdate)

if (autoConnect) this.connect()
}

Expand Down Expand Up @@ -260,6 +272,8 @@ export class SocketIOProvider extends Observable {
*/
initAwarenessListeners = () => {
this.socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => {
if (!this.awareness) return

AwarenessProtocol.applyAwarenessUpdate(
this.awareness,
new Uint8Array(update),
Expand Down Expand Up @@ -310,7 +324,7 @@ export class SocketIOProvider extends Observable {
Y.applyUpdate(this.doc, new Uint8Array(update), this)
}
)
if (this.awareness.getLocalState() !== null) {
if (this.enableAwareness && this.awareness && this.awareness.getLocalState() !== null) {
this.socket.emit(
'awareness-update',
AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
Expand Down Expand Up @@ -355,13 +369,15 @@ export class SocketIOProvider extends Observable {

this.emit('connection-close', [event, this])
this.synced = false
AwarenessProtocol.removeAwarenessStates(
this.awareness,
Array.from(this.awareness.getStates().keys()).filter(
(client) => client !== this.doc.clientID
),
this
)
if (this.enableAwareness && this.awareness) {
AwarenessProtocol.removeAwarenessStates(
this.awareness,
Array.from(this.awareness.getStates().keys()).filter(
(client) => client !== this.doc.clientID
),
this
)
}
this.emit('status', [{ status: 'disconnected' }])
}

Expand All @@ -382,8 +398,10 @@ export class SocketIOProvider extends Observable {
if (this.resyncInterval != null) clearInterval(this.resyncInterval)
this.disconnect()
if (typeof window !== 'undefined') { window.removeEventListener('beforeunload', this.beforeUnloadHandler) } else if (typeof process !== 'undefined') { process.off('exit', this.beforeUnloadHandler) }
this.awareness.off('update', this.awarenessUpdate)
this.awareness.destroy()
if (this.enableAwareness) {
this.awareness?.off('update', this.awarenessUpdate)
this.awareness?.destroy()
}
this.doc.off('update', this.onUpdateDoc)
super.destroy()
}
Expand Down Expand Up @@ -429,6 +447,8 @@ export class SocketIOProvider extends Observable {
* @readonly
*/
awarenessUpdate = ({ added, updated, removed }, origin) => {
if (!this.awareness) return

const changedClients = added.concat(updated).concat(removed)
this.socket.emit(
'awareness-update',
Expand Down Expand Up @@ -457,6 +477,8 @@ export class SocketIOProvider extends Observable {
* @readonly
*/
beforeUnloadHandler = () => {
if (!this.enableAwareness || !this.awareness) return

AwarenessProtocol.removeAwarenessStates(
this.awareness,
[this.doc.clientID],
Expand Down Expand Up @@ -485,21 +507,24 @@ export class SocketIOProvider extends Observable {
{ type: 'sync-step-2', data: Y.encodeStateAsUpdate(this.doc) },
this
)
bc.publish(
this._broadcastChannel,
{ type: 'query-awareness', data: null },
this
)
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID
])
},
this
)

if (this.enableAwareness && this.awareness) {
bc.publish(
this._broadcastChannel,
{ type: 'query-awareness', data: null },
this
)
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID
])
},
this
)
}
}

/**
Expand All @@ -509,18 +534,20 @@ export class SocketIOProvider extends Observable {
* @readonly
*/
disconnectBc = () => {
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID],
new Map()
)
},
this
)
if (this.enableAwareness && this.awareness) {
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID],
new Map()
)
},
this
)
}
if (this.bcconnected) {
bc.unsubscribe(this._broadcastChannel, this.onBroadcastChannelMessage)
this.bcconnected = false
Expand Down Expand Up @@ -556,27 +583,33 @@ export class SocketIOProvider extends Observable {
Y.applyUpdate(this.doc, new Uint8Array(message.data), this)
break

case 'query-awareness':
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(
this.awareness,
Array.from(this.awareness.getStates().keys())
)
},
this
)
case 'query-awareness': {
if (this.enableAwareness && this.awareness) {
bc.publish(
this._broadcastChannel,
{
type: 'awareness-update',
data: AwarenessProtocol.encodeAwarenessUpdate(
this.awareness,
Array.from(this.awareness.getStates().keys())
)
},
this
)
}
break

case 'awareness-update':
AwarenessProtocol.applyAwarenessUpdate(
this.awareness,
new Uint8Array(message.data),
this
)
}

case 'awareness-update': {
if (this.enableAwareness && this.awareness) {
AwarenessProtocol.applyAwarenessUpdate(
this.awareness,
new Uint8Array(message.data),
this
)
}
break
}

default:
break
Expand Down
Loading
Loading