diff --git a/package.json b/package.json index 522356f..c704cea 100644 --- a/package.json +++ b/package.json @@ -20,13 +20,14 @@ "dependencies": { "@tencent-sdk/capi": "^0.2.15-alpha.0", "dijkstrajs": "^1.0.1", + "dot-qs": "0.2.0", "duplexify": "^4.1.1", "end-of-stream": "^1.4.4", "https-proxy-agent": "^5.0.0", "socket.io-client": "^2.3.0", - "socket.io-stream": "^0.9.1", - "winston": "^3.2.1", - "dot-qs": "0.2.0" + "component-bind": "~1.0.0", + "debug": "~2.6.9", + "winston": "^3.2.1" }, "devDependencies": { "babel-eslint": "9.0.0", @@ -36,4 +37,4 @@ "eslint-plugin-prettier": "^3.0.1", "prettier": "^1.18.2" } -} +} \ No newline at end of file diff --git a/sdk/debug/lib/wshub-client/index.js b/sdk/debug/lib/wshub-client/index.js index a53576d..ee951df 100644 --- a/sdk/debug/lib/wshub-client/index.js +++ b/sdk/debug/lib/wshub-client/index.js @@ -44,7 +44,7 @@ var __spreadArrays = (this && this.__spreadArrays) || function () { }; Object.defineProperty(exports, "__esModule", { value: true }); var socketIo = require("socket.io-client"); -var socketIoStream = require("socket.io-stream"); +var socketIoStream = require("../wshub-socket.io-stream"); var net_1 = require("net"); var wshub_bipipe_1 = require("../wshub-bipipe"); var wshub_proxy_1 = require("../wshub-proxy"); diff --git a/sdk/debug/lib/wshub-socket.io-stream/index.js b/sdk/debug/lib/wshub-socket.io-stream/index.js new file mode 100644 index 0000000..e01e3d1 --- /dev/null +++ b/sdk/debug/lib/wshub-socket.io-stream/index.js @@ -0,0 +1,64 @@ +var Socket = require('./socket'); +var IOStream = require('./iostream'); + + +exports = module.exports = lookup; + +/** + * Expose Node Buffer for browser. + * + * @api public + */ +exports.Buffer = Buffer; + +/** + * Expose Socket constructor. + * + * @api public + */ +exports.Socket = Socket; + +/** + * Expose IOStream constructor. + * + * @api public + */ +exports.IOStream = IOStream; + +/** + * Forces base 64 encoding when emitting. Must be set to true for Socket.IO v0.9 or lower. + * + * @api public + */ +exports.forceBase64 = false; + +/** + * Look up an existing Socket. + * + * @param {socket.io#Socket} socket.io + * @param {Object} options + * @return {Socket} Socket instance + * @api public + */ +function lookup(sio, options) { + options = options || {}; + if (null == options.forceBase64) { + options.forceBase64 = exports.forceBase64; + } + + if (!sio._streamSocket) { + sio._streamSocket = new Socket(sio, options); + } + return sio._streamSocket; +} + +/** + * Creates a new duplex stream. + * + * @param {Object} options + * @return {IOStream} duplex stream + * @api public + */ +exports.createStream = function(options) { + return new IOStream(options); +}; diff --git a/sdk/debug/lib/wshub-socket.io-stream/iostream.js b/sdk/debug/lib/wshub-socket.io-stream/iostream.js new file mode 100644 index 0000000..4f8ef7a --- /dev/null +++ b/sdk/debug/lib/wshub-socket.io-stream/iostream.js @@ -0,0 +1,265 @@ +var util = require('util'); +var Duplex = require('stream').Duplex; +var bind = require('component-bind'); +var uuid = require('./uuid'); +var debug = require('debug')('socket.io-stream:iostream'); + + +module.exports = IOStream; + +util.inherits(IOStream, Duplex); + +/** + * Duplex + * + * @param {Object} options + * @api private + */ +function IOStream(options) { + if (!(this instanceof IOStream)) { + return new IOStream(options); + } + + IOStream.super_.call(this, options); + + this.options = options; + this.id = uuid(); + this.socket = null; + + // Buffers + this.pushBuffer = []; + this.writeBuffer = []; + + // Op states + this._readable = false; + this._writable = false; + this.destroyed = false; + + // default to *not* allowing half open sockets + this.allowHalfOpen = options && options.allowHalfOpen || false; + + this.on('finish', this._onfinish); + this.on('end', this._onend); + this.on('error', this._onerror); +} + +/** + * Ensures that no more I/O activity happens on this stream. + * Not necessary in the usual case. + * + * @api public + */ +IOStream.prototype.destroy = function() { + debug('destroy'); + + if (this.destroyed) { + debug('already destroyed'); + return; + } + + this.readable = this.writable = false; + + if (this.socket) { + debug('clean up'); + this.socket.cleanup(this.id); + this.socket = null; + } + + this.destroyed = true; +}; + +/** + * Local read + * + * @api private + */ +IOStream.prototype._read = function(size) { + var push; + + // We can not read from the socket if it's destroyed obviously ... + if (this.destroyed) return; + + if (this.pushBuffer.length) { + // flush buffer and end if it exists. + while (push = this.pushBuffer.shift()) { + if (!push()) break; + } + return; + } + + this._readable = true; + + // Go get data from remote stream + // Calls + // ._onread remotely + // then + // ._onwrite locally + this.socket._read(this.id, size); +}; + + +/** + * Read from remote stream + * + * @api private + */ +IOStream.prototype._onread = function(size) { + var write = this.writeBuffer.shift(); + if (write) return write(); + + this._writable = true; +}; + +/** + * Write local data to remote stream + * Calls + * remtote ._onwrite + * + * @api private + */ +IOStream.prototype._write = function(chunk, encoding, callback) { + var self = this; + + function write() { + // We can not write to the socket if it's destroyed obviously ... + if (self.destroyed) return; + + self._writable = false; + self.socket._write(self.id, chunk, encoding, callback); + } + + if (this._writable) { + write(); + } else { + this.writeBuffer.push(write); + } +}; + +/** + * Write the data fetched remotely + * so that we can now read locally + * + * @api private + */ +IOStream.prototype._onwrite = function(chunk, encoding, callback) { + var self = this; + + function push() { + self._readable = false; + var ret = self.push(chunk || '', encoding); + callback(); + return ret; + } + + if (this._readable) { + push(); + } else { + this.pushBuffer.push(push); + } +}; + +/** + * When ending send 'end' event to remote stream + * + * @api private + */ +IOStream.prototype._end = function() { + if (this.pushBuffer.length) { + // end after flushing buffer. + this.pushBuffer.push(bind(this, '_done')); + } else { + this._done(); + } +}; + +/** + * Remote stream just ended + * + * @api private + */ +IOStream.prototype._done = function() { + this._readable = false; + + // signal the end of the data. + return this.push(null); +}; + +/** + * the user has called .end(), and all the bytes have been + * sent out to the other side. + * If allowHalfOpen is false, or if the readable side has + * ended already, then destroy. + * If allowHalfOpen is true, then we need to set writable false, + * so that only the writable side will be cleaned up. + * + * @api private + */ +IOStream.prototype._onfinish = function() { + debug('_onfinish'); + // Local socket just finished + // send 'end' event to remote + if (this.socket) { + this.socket._end(this.id); + } + + this.writable = false; + this._writableState.ended = true; + + if (!this.readable || this._readableState.ended) { + debug('_onfinish: ended, destroy %s', this._readableState); + return this.destroy(); + } + + debug('_onfinish: not ended'); + + if (!this.allowHalfOpen) { + this.push(null); + + // just in case we're waiting for an EOF. + if (this.readable && !this._readableState.endEmitted) { + this.read(0); + } + } +}; + +/** + * the EOF has been received, and no more bytes are coming. + * if the writable side has ended already, then clean everything + * up. + * + * @api private + */ +IOStream.prototype._onend = function() { + debug('_onend'); + this.readable = false; + this._readableState.ended = true; + + if (!this.writable || this._writableState.finished) { + debug('_onend: %s', this._writableState); + return this.destroy(); + } + + debug('_onend: not finished'); + + if (!this.allowHalfOpen) { + this.end(); + } +}; + +/** + * When error in local stream + * notyify remote + * if err.remote = true + * then error happened on remote stream + * + * @api private + */ +IOStream.prototype._onerror = function(err) { + // check if the error came from remote stream. + if (!err.remote && this.socket) { + // notify the error to the corresponding remote stream. + this.socket._error(this.id, err); + } + + this.destroy(); +}; diff --git a/sdk/debug/lib/wshub-socket.io-stream/parser.js b/sdk/debug/lib/wshub-socket.io-stream/parser.js new file mode 100644 index 0000000..a1c32f5 --- /dev/null +++ b/sdk/debug/lib/wshub-socket.io-stream/parser.js @@ -0,0 +1,105 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var IOStream = require('./iostream'); +var slice = Array.prototype.slice; + +exports.Encoder = Encoder; +exports.Decoder = Decoder; + +util.inherits(Encoder, EventEmitter); + +function Encoder() { + EventEmitter.call(this); +} + +/** + * Encode streams to placeholder objects. + * + * @api public + */ +Encoder.prototype.encode = function(v) { + if (v instanceof IOStream) { + return this.encodeStream(v); + } else if (util.isArray(v)) { + return this.encodeArray(v); + } else if (v && 'object' == typeof v) { + return this.encodeObject(v); + } + return v; +} + +Encoder.prototype.encodeStream = function(stream) { + this.emit('stream', stream); + + // represent a stream in an object. + var v = { $stream: stream.id }; + if (stream.options) { + v.options = stream.options; + } + return v; +} + +Encoder.prototype.encodeArray = function(arr) { + var v = []; + for (var i = 0, len = arr.length; i < len; i++) { + v.push(this.encode(arr[i])); + } + return v; +} + +Encoder.prototype.encodeObject = function(obj) { + var v = {}; + for (var k in obj) { + if (obj.hasOwnProperty(k)) { + v[k] = this.encode(obj[k]); + } + } + return v; +} + +util.inherits(Decoder, EventEmitter); + +function Decoder() { + EventEmitter.call(this); +} + +/** + * Decode placeholder objects to streams. + * + * @api public + */ +Decoder.prototype.decode = function(v) { + if (v && v.$stream) { + return this.decodeStream(v); + } else if (util.isArray(v)) { + return this.decodeArray(v); + } else if (v && 'object' == typeof v) { + return this.decodeObject(v); + } + return v; +} + +Decoder.prototype.decodeStream = function(obj) { + var stream = new IOStream(obj.options); + stream.id = obj.$stream; + this.emit('stream', stream); + return stream; +} + +Decoder.prototype.decodeArray = function(arr) { + var v = []; + for (var i = 0, len = arr.length; i < len; i++) { + v.push(this.decode(arr[i])); + } + return v; +} + +Decoder.prototype.decodeObject = function(obj) { + var v = {}; + for (var k in obj) { + if (obj.hasOwnProperty(k)) { + v[k] = this.decode(obj[k]); + } + } + return v; +} diff --git a/sdk/debug/lib/wshub-socket.io-stream/socket.js b/sdk/debug/lib/wshub-socket.io-stream/socket.js new file mode 100644 index 0000000..633bb0c --- /dev/null +++ b/sdk/debug/lib/wshub-socket.io-stream/socket.js @@ -0,0 +1,287 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var bind = require('component-bind'); +var IOStream = require('./iostream'); +var parser = require('./parser'); +var debug = require('debug')('socket.io-stream:socket'); +var emit = EventEmitter.prototype.emit; +var on = EventEmitter.prototype.on; +var slice = Array.prototype.slice; + + +exports = module.exports = Socket; + +/** + * Base event name for messaging. + * + * @api public + */ +exports.event = '$stream'; + +exports.events = [ + 'error', + 'newListener', + 'removeListener' +]; + +util.inherits(Socket, EventEmitter); + +/** + * Bidirectional stream socket which wraps Socket.IO. + * + * @param {socket.io#Socket} socket.io + * @api public + */ +function Socket(sio, options) { + if (!(this instanceof Socket)) { + return new Socket(sio, options); + } + + EventEmitter.call(this); + + options = options || {}; + + this.sio = sio; + this.forceBase64 = !!options.forceBase64; + this.streams = {}; + this.encoder = new parser.Encoder(); + this.decoder = new parser.Decoder(); + + var eventName = exports.event; + sio.on(eventName, bind(this, emit)); + sio.on(eventName + '-read', bind(this, '_onread')); + sio.on(eventName + '-write', bind(this, '_onwrite')); + sio.on(eventName + '-end', bind(this, '_onend')); + sio.on(eventName + '-error', bind(this, '_onerror')); + sio.on('error', bind(this, emit, 'error')); + sio.on('disconnect', bind(this, '_ondisconnect')); + + this.encoder.on('stream', bind(this, '_onencode')); + this.decoder.on('stream', bind(this, '_ondecode')); +} + +/** + * Original emit function. + * + * @api private + */ +Socket.prototype.$emit = emit; + +/** + * Emits streams to this corresponding server/client. + * + * @return {Socket} self + * @api public + */ +Socket.prototype.emit = function(type) { + if (~exports.events.indexOf(type)) { + return emit.apply(this, arguments); + } + this._stream.apply(this, arguments); + return this; +}; + +Socket.prototype.on = function(type, listener) { + if (~exports.events.indexOf(type)) { + return on.apply(this, arguments); + } + + this._onstream(type, listener); + return this; +}; + +/** + * Sends a new stream request. + * + * @param {String} event type + * @api private + */ +Socket.prototype._stream = function(type) { + debug('sending new streams'); + + var self = this; + var args = slice.call(arguments, 1); + var ack = args[args.length - 1]; + if ('function' == typeof ack) { + args[args.length - 1] = function() { + var args = slice.call(arguments); + args = self.decoder.decode(args); + ack.apply(this, args); + }; + } + + args = this.encoder.encode(args); + var sio = this.sio; + sio.emit.apply(sio, [exports.event, type].concat(args)); +}; + +/** + * Notifies the read event. + * + * @api private + */ +Socket.prototype._read = function(id, size) { + this.sio.emit(exports.event + '-read', id, size); +}; + +/** + * Requests to write a chunk. + * + * @api private + */ +Socket.prototype._write = function(id, chunk, encoding, callback) { + if (Buffer.isBuffer(chunk)) { + if (this.forceBase64) { + encoding = 'base64'; + chunk = chunk.toString(encoding); + } else if (!global.Buffer) { + // socket.io can't handle Buffer when using browserify. + if (chunk.toArrayBuffer) { + chunk = chunk.toArrayBuffer(); + } else { + chunk = chunk.buffer; + } + } + } + this.sio.emit(exports.event + '-write', id, chunk, encoding, callback); +}; + +Socket.prototype._end = function(id) { + this.sio.emit(exports.event + '-end', id); +}; + +Socket.prototype._error = function(id, err) { + this.sio.emit(exports.event + '-error', id, err.message || err); +}; + +/** + * Handles a new stream request. + * + * @param {String} event type + * @param {Function} listener + * + * @api private + */ +Socket.prototype._onstream = function(type, listener) { + if ('function' != typeof listener) { + throw TypeError('listener must be a function'); + } + + function onstream() { + debug('new streams'); + var self = this; + var args = slice.call(arguments); + var ack = args[args.length - 1]; + if ('function' == typeof ack) { + args[args.length - 1] = function() { + var args = slice.call(arguments); + args = self.encoder.encode(args); + ack.apply(this, args); + }; + } + + args = this.decoder.decode(args); + listener.apply(this, args); + } + + // for removeListener + onstream.listener = listener; + + on.call(this, type, onstream); +}; + +Socket.prototype._onread = function(id, size) { + debug('read: "%s"', id); + + var stream = this.streams[id]; + if (stream) { + stream._onread(size); + } else { + debug('ignore invalid stream id'); + } +}; + +Socket.prototype._onwrite = function(id, chunk, encoding, callback) { + debug('write: "%s"', id); + + var stream = this.streams[id]; + if (!stream) { + callback('invalid stream id: ' + id); + return; + } + + if (global.ArrayBuffer && chunk instanceof ArrayBuffer) { + // make sure that chunk is a buffer for stream + chunk = new Buffer(new Uint8Array(chunk)); + } + stream._onwrite(chunk, encoding, callback); +}; + +Socket.prototype._onend = function(id) { + debug('end: "%s"', id); + + var stream = this.streams[id]; + if (!stream) { + debug('ignore non-existent stream id: "%s"', id); + return; + } + + stream._end(); +}; + +Socket.prototype._onerror = function(id, message) { + debug('error: "%s", "%s"', id, message); + + var stream = this.streams[id]; + if (!stream) { + debug('invalid stream id: "%s"', id); + return; + } + + var err = new Error(message); + err.remote = true; + stream.emit('error', err); +}; + +Socket.prototype._ondisconnect = function() { + var stream; + for (var id in this.streams) { + stream = this.streams[id]; + stream.destroy(); + + // Close streams when the underlaying + // socket.io connection is closed (regardless why) + stream.emit('close'); + stream.emit('error', new Error('Connection aborted')); + } +}; + +Socket.prototype._onencode = function(stream) { + if (stream.socket || stream.destroyed) { + throw new Error('stream has already been sent.'); + } + + var id = stream.id; + if (this.streams[id]) { + throw new Error('Encoded stream already exists: ' + id); + } + + this.streams[id] = stream; + stream.socket = this; +}; + +Socket.prototype._ondecode = function(stream) { + var id = stream.id; + if (this.streams[id]) { + this._error(id, new Error('Decoded stream already exists: ' + id)); + return; + } + + this.streams[id] = stream; + stream.socket = this; +}; + +Socket.prototype.cleanup = function(id) { + delete this.streams[id]; +}; + diff --git a/sdk/debug/lib/wshub-socket.io-stream/uuid.js b/sdk/debug/lib/wshub-socket.io-stream/uuid.js new file mode 100644 index 0000000..4a11ab4 --- /dev/null +++ b/sdk/debug/lib/wshub-socket.io-stream/uuid.js @@ -0,0 +1,25 @@ +// UUID function from https://gist.github.com/jed/982883 +// More lightweight than node-uuid +function b( + a // placeholder +){ + return a // if the placeholder was passed, return + ? ( // a random number from 0 to 15 + a ^ // unless b is 8, + Math.random() // in which case + * 16 // a random number from + >> a/4 // 8 to 11 + ).toString(16) // in hexadecimal + : ( // or otherwise a concatenated string: + [1e7] + // 10000000 + + -1e3 + // -1000 + + -4e3 + // -4000 + + -8e3 + // -80000000 + + -1e11 // -100000000000, + ).replace( // replacing + /[018]/g, // zeroes, ones, and eights with + b // random hex digits + ) +} + +module.exports = b;