Skip to content

Commit 3297898

Browse files
Manage forward communication and queue system with @scorpion9979
1 parent 4637788 commit 3297898

File tree

4 files changed

+74
-8
lines changed

4 files changed

+74
-8
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
'use strict'
2+
3+
exports.up = function (db) {
4+
return db.runSql(`
5+
CREATE TABLE queue (
6+
queue_id INTEGER PRIMARY KEY,
7+
sender INTEGER NOT NULL,
8+
recipient INTEGER,
9+
message TEXT
10+
);
11+
`)
12+
}
13+
14+
exports.down = function (db) {
15+
return db.runSql(`
16+
DROP TABLE queue;
17+
`)
18+
}

src/Instance.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,46 @@ module.exports = class Instance extends EventEmitter {
102102
channel: 'instance.to.client',
103103
action: 'pinged'
104104
}
105+
},
106+
{
107+
channel: 'client.to.node',
108+
callback: this.forwardClientToNode.bind(this)
105109
}
106110
]
107111
}
112+
async forwardClientToNode (protocol, sender, parameters, reply, forwardObject) {
113+
let onlineRecipient = this.online.nodes.find(n => n.id === forwardObject.recipient)
114+
if (onlineRecipient) {
115+
this.protocols[onlineRecipient.protocol].to(onlineRecipient.resource, 'client.to.node', forwardObject.action, parameters)
116+
reply({ msg: 'Message forwarded' })
117+
} else {
118+
const { success, results } = await this.db.run(` SELECT resource FROM nodes where node_id = ?`, forwardObject.recipient)
119+
if (success) {
120+
const resource = results[0].resource
121+
// TODO: find a better regex to split ws://923u10hdashdsh into ['ws', '923u10hdashdsh']
122+
const [, recipientProtocol, recipientResource] = resource.match(/(^\w+):\/\/([a-z0-9]+)/)
123+
if (this.protocols[recipientProtocol].needsQueue) {
124+
const { success, results } = await this.isClientTokenValid(parameters.token)
125+
// we don't want to pass the token to the recipient !!! TODO: change the authentication system, put the token at the level of action, properties, recipient
126+
parameters.token = ''
127+
if (success) {
128+
const client = results[0]
129+
const { success } = await this.db.run(`
130+
INSERT INTO queue
131+
(sender, recipient, message)
132+
VALUES
133+
(?, ?, ?)`, [client.client_id, forwardObject.recipient, parameters])
134+
if (success) {
135+
reply({ msg: 'Message stored in the queue' })
136+
}
137+
}
138+
} else {
139+
this.protocols[recipientProtocol].to(recipientResource, 'client.to.node', forwardObject.action, parameters)
140+
reply({ msg: 'Message forwarded' })
141+
}
142+
}
143+
}
144+
}
108145
loadListeners () {
109146
for (let protocolId in this.protocols) {
110147
let protocol = this.protocols[protocolId]

src/protocols/Http.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ class Http {
33
constructor (http) {
44
this.resource = http
55
this.ID = 'http'
6+
this.needsQueue = false
67
}
78
to (recipient, channel, action, parameters, response) {
89
fetch(recipient + channel + '/' + action, {

src/protocols/WebSockets.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,22 @@ class WebSockets {
22
constructor (ws) {
33
this.resource = ws
44
this.ID = 'ws'
5+
this.needsQueue = true
56
}
6-
to (recipient, channel, action, parameters, response) {
7-
if (recipient) {
8-
this.resource.to(recipient).emit(channel, {
9-
action: action,
10-
parameters: parameters
11-
})
7+
to (delivery, channel, action, parameters, response) {
8+
if (delivery) {
9+
if (typeof delivery === 'object') {
10+
this.resource.to(delivery.mediator).emit(channel, {
11+
action: action,
12+
parameters: parameters,
13+
recipient: delivery.recipient
14+
})
15+
} else if (typeof delivery === 'string') {
16+
this.resource.to(delivery).emit(channel, {
17+
action: action,
18+
parameters: parameters
19+
})
20+
}
1221
} else {
1322
this.resource.emit(channel, {
1423
action: action,
@@ -22,12 +31,13 @@ class WebSockets {
2231
on (channel, action, resource, callback, response) {
2332
let resourceId = resource.id
2433
resource.on(channel, data => {
25-
if (data.action === action) {
34+
if (!action || data.action === action) {
2635
callback(
2736
this.ID,
2837
resourceId,
2938
data.parameters,
30-
(parameters) => this.to(resourceId, response.channel, response.action, parameters, { listen: null, resource })
39+
(parameters) => this.to(resourceId, response.channel, response.action, parameters, { listen: null, resource }),
40+
{ recipient: data.recipient, action: action }
3141
)
3242
}
3343
})

0 commit comments

Comments
 (0)