From b8de80d5c82552a3ffb8329285d2b466b6ebd7e6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 8 Aug 2016 15:14:28 +0200 Subject: [PATCH] feat(pull): migration to pull-streams --- package.json | 28 ++-- src/agreement.js | 77 ++++++++++ src/dialer.js | 118 ++++++++------- src/listener.js | 128 ++++++++-------- test/{multistream.spec.js => index.spec.js} | 155 ++++++++------------ 5 files changed, 281 insertions(+), 225 deletions(-) create mode 100644 src/agreement.js rename test/{multistream.spec.js => index.spec.js} (61%) diff --git a/package.json b/package.json index 5c4f54d..66e4f8c 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "multistream-select", "version": "0.10.0", "description": "JavaScript implementation of the multistream spec", - "main": "lib/index.js", + "main": "src/index.js", "jsnext:main": "src/index.js", "scripts": { "lint": "aegir-lint", @@ -41,19 +41,23 @@ "author": "David Dias ", "license": "MIT", "dependencies": { - "babel-runtime": "^6.6.1", - "length-prefixed-stream": "^1.5.0", - "lodash.range": "^3.1.5", - "run-series": "^1.1.4", - "varint": "^4.0.0" + "babel-runtime": "^6.11.6", + "debug": "^2.2.0", + "interface-connection": "^0.1.8", + "lodash.isfunction": "^3.0.8", + "lodash.range": "^3.1.7", + "pull-handshake": "^1.1.3", + "pull-length-prefixed": "^1.0.0", + "pull-stream": "^3.4.3", + "varint": "^4.0.1" }, "devDependencies": { - "aegir": "^3.1.0", - "run-parallel": "^1.1.6", - "bl": "^1.1.2", + "aegir": "^6.0.0", "chai": "^3.5.0", - "pre-commit": "^1.1.2", - "stream-pair": "^1.0.3" + "pre-commit": "^1.1.3", + "pull-pair": "^1.1.0", + "run-parallel": "^1.1.6", + "run-series": "^1.1.4" }, "contributors": [ "David Dias ", @@ -61,4 +65,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/agreement.js b/src/agreement.js new file mode 100644 index 0000000..6f6761c --- /dev/null +++ b/src/agreement.js @@ -0,0 +1,77 @@ +'use strict' + +const handshake = require('pull-handshake') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:multistream:agreement') +log.error = debug('libp2p:multistream:agreement:error') + +exports.dial = (header, cb) => { + const stream = handshake({timeout: 60 * 1000}, cb) + const shake = stream.handshake + + log('writing header %s', header) + writeEncoded(shake, new Buffer(header + '\n'), cb) + + lp.decodeFromReader(shake, (err, data) => { + if (err) return cb(err) + const protocol = data.toString().slice(0, -1) + if (protocol !== header) { + cb(new Error(`Unkown header: "${protocol}"`)) + } + + log('header ack') + cb(null, shake.rest()) + }) + + return stream +} + +exports.listen = (rawConn, handlersMap, defaultHandler) => { + const cb = (err) => { + // TODO: pass errors somewhere + log.error(err) + } + const stream = handshake({timeout: 60 * 1000}, cb) + const shake = stream.handshake + + lp.decodeFromReader(shake, (err, data) => { + if (err) return cb(err) + log('received: %s', data.toString()) + const protocol = data.toString().slice(0, -1) + const [key] = Object.keys(handlersMap).filter((id) => id === protocol) + + if (key) { + log('ack: %s', protocol) + writeEncoded(shake, data, cb) + handlersMap[key](new Connection(shake.rest(), rawConn)) + } else { + log('unkown protocol: %s', protocol) + defaultHandler(protocol, shake.rest()) + } + }) + + return stream +} + +function encode (msg, cb) { + const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)] + + pull( + pull.values(values), + lp.encode(), + pull.collect((err, encoded) => { + if (err) return cb(err) + cb(null, encoded[0]) + }) + ) +} + +function writeEncoded (writer, msg, cb) { + encode(msg, (err, msg) => { + if (err) return cb(err) + writer.write(msg) + }) +} diff --git a/src/dialer.js b/src/dialer.js index 1a0b272..eecb70f 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,78 +1,88 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') const varint = require('varint') -const range = require('lodash.range') -const series = require('run-series') +const pull = require('pull-stream') +const Connection = require('interface-connection').Connection +const debug = require('debug') +const log = debug('libp2p:multistream:dialer') -exports = module.exports = Dialer +const PROTOCOL_ID = require('./protocol-id') +const agreement = require('./agreement') -function Dialer () { - if (!(this instanceof Dialer)) { - return new Dialer() +module.exports = class Dialer { + constructor () { + this.conn = null } - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - encode.write(new Buffer(PROTOCOL_ID + '\n')) - conn = _conn - callback() - } else { - callback(new Error('Incompatible multistream')) + handle (rawConn, cb) { + log('handling connection') + const ms = agreement.dial(PROTOCOL_ID, (err, conn) => { + if (err) { + return cb(err) } + log('handshake success') + + this.conn = new Connection(conn, rawConn) + + cb() }) + pull(rawConn, ms, rawConn) } - this.select = (protocol, callback) => { - if (!conn) { - return callback(new Error('multistream handshake has not finalized yet')) + select (protocol, cb) { + log('selecting %s', protocol) + if (!this.conn) { + return cb(new Error('multistream handshake has not finalized yet')) } - encode.write(new Buffer(protocol + '\n')) - decode.once('data', function (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - if (msg === protocol) { - return callback(null, conn) - } - if (msg === 'na') { - return callback(new Error(protocol + ' not supported')) + const stream = agreement.dial(protocol, (err, conn) => { + if (err) { + return cb(err) } + // TODO: handle 'na' + cb(null, new Connection(conn, this.conn)) }) + + pull(this.conn, stream, this.conn) } - this.ls = (callback) => { - encode.write(new Buffer('ls' + '\n')) - let protos = [] - decode.once('data', function (msgBuffer) { - const size = varint.decode(msgBuffer) // eslint-disable-line - const nProtos = varint.decode(msgBuffer, varint.decode.bytes) - - timesSeries(nProtos, (n, next) => { - decode.once('data', function (msgBuffer) { - protos.push(msgBuffer.toString().slice(0, -1)) - next() + ls (cb) { + const ls = agreement.dial('ls', (err, conn) => { + if (err) return cb(err) + + pull( + conn, + lp.decode(), + collectLs(conn), + pull.map(stringify), + pull.collect((err, list) => { + if (err) return cb(err) + return cb(null, list.slice(1)) }) - }, (err) => { - if (err) { - return callback(err) - } - callback(null, protos) - }) + ) }) + + pull(this.conn, ls, this.conn) } } -function timesSeries (i, work, callback) { - series(range(i).map((i) => (callback) => work(i, callback)), callback) +function stringify (buf) { + return buf.toString().slice(0, -1) +} + +function collectLs (conn) { + let first = true + let counter = 0 + + return pull.take((msg) => { + if (first) { + const size = varint.decode(msg) // eslint-disable-line + counter = varint.decode(msg, varint.decode.bytes) + return true + } + + return counter-- > 0 + }) } diff --git a/src/listener.js b/src/listener.js index ead8ee5..9bc8d62 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,83 +1,87 @@ 'use strict' -const lps = require('length-prefixed-stream') -const PROTOCOL_ID = require('./protocol-id') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') const varint = require('varint') +const isFunction = require('lodash.isfunction') +const assert = require('assert') +const debug = require('debug') +const log = debug('libp2p:multistream:listener') -exports = module.exports = Listener +const PROTOCOL_ID = require('./protocol-id') +const agreement = require('./agreement') -function Listener () { - if (!(this instanceof Listener)) { - return new Listener() +module.exports = class Listener { + constructor () { + this.handlers = { + ls: (conn) => this._ls(conn) + } } - const handlers = {} - const encode = lps.encode() - const decode = lps.decode() - let conn - // perform the multistream handshake - this.handle = (_conn, callback) => { - encode.pipe(_conn) - _conn.pipe(decode) - - encode.write(new Buffer(PROTOCOL_ID + '\n')) + handle (conn, cb) { + log('handling connection') + const ms = agreement.listen(conn, { + [PROTOCOL_ID]: (conn) => { + log('handshake success') + const msgHandler = agreement.listen(conn, this.handlers, (protocol, conn) => { + log('unkown protocol: %s', protocol) + pull( + pull.values([new Buffer('na')]), + conn + ) + }) + pull(conn, msgHandler, conn) - decode.once('data', (buffer) => { - const msg = buffer.toString().slice(0, -1) - if (msg === PROTOCOL_ID) { - conn = _conn - decode.once('data', incMsg) - callback() - } else { - // TODO This would be where we try to support other versions - // of multistream (backwards compatible). Currently we have - // just one, so this never happens. - return callback(new Error('not supported version of multistream')) + cb() } + }, () => { + cb(new Error('unkown protocol')) }) - function incMsg (msgBuffer) { - const msg = msgBuffer.toString().slice(0, -1) - - if (msg === 'ls') { - const protos = Object.keys(handlers) - const nProtos = protos.length - // total size of the list of protocols, including varint and newline - const size = protos.reduce((size, proto) => { - var p = new Buffer(proto + '\n') - var el = varint.encodingLength(p.length) - return size + el - }, 0) + pull(conn, ms, conn) + } - var nProtoVI = new Buffer(varint.encode(nProtos)) - var sizeVI = new Buffer(varint.encode(size)) - var buf = Buffer.concat([nProtoVI, sizeVI, new Buffer('\n')]) - encode.write(buf) - protos.forEach((proto) => { - encode.write(new Buffer(proto + '\n')) - }) - } + // be ready for a given `protocol` + addHandler (protocol, handler) { + log('handling %s', protocol) - if (handlers[msg]) { - // Protocol supported, ACK back - encode.write(new Buffer(msg + '\n')) - return handlers[msg](conn) - } else { - // Protocol not supported, wait for new handshake - encode.write(new Buffer('na' + '\n')) - } + assert(isFunction(handler), 'handler must be a function') - decode.once('data', incMsg) + if (this.handlers[protocol]) { + // TODO: Do we want to handle this better? + log('overwriting handler for %s', protocol) } + + this.handlers[protocol] = handler } - // be ready for a given `protocol` - this.addHandler = (protocol, handlerFunc) => { - if ((typeof handlerFunc !== 'function')) { - throw new Error('handler function must be a function') - } + _ls (conn) { + const protos = Object.keys(this.handlers) + .filter((key) => key !== 'ls') + const nProtos = protos.length + // total size of the list of protocols, including varint and newline + const size = protos.reduce((size, proto) => { + const p = new Buffer(proto + '\n') + const el = varint.encodingLength(p.length) + return size + el + }, 0) + + const buf = Buffer.concat([ + new Buffer(varint.encode(nProtos)), + new Buffer(varint.encode(size)), + new Buffer('\n') + ]) + + const encodedProtos = protos.map((proto) => { + return new Buffer(proto + '\n') + }) + const values = [buf].concat(encodedProtos) - handlers[protocol] = handlerFunc + pull( + pull.values(values), + lp.encode(), + conn + ) } } diff --git a/test/multistream.spec.js b/test/index.spec.js similarity index 61% rename from test/multistream.spec.js rename to test/index.spec.js index 41c3cf3..e0e24eb 100644 --- a/test/multistream.spec.js +++ b/test/index.spec.js @@ -3,17 +3,17 @@ 'use strict' const expect = require('chai').expect -const streamPair = require('stream-pair') +const pull = require('pull-stream') +const pair = require('pull-pair/duplex') const multistream = require('../src') const parallel = require('run-parallel') const series = require('run-series') -const bl = require('bl') describe('multistream normal mode', function () { it('performs multistream handshake', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] parallel([ (cb) => { @@ -30,9 +30,9 @@ describe('multistream normal mode', function () { }) it('handle and select a protocol', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -53,82 +53,36 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - next() - })) - conn.write('banana') - conn.end() - }) - } - ], done) - }) - it('handle and select a protocol, respecting pause and resume ', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other - let handled = false - let msl - let msd - series([ - (next) => { - parallel([ - (cb) => { - msl = new multistream.Listener() - expect(msl).to.exist - msl.handle(listenerConn, cb) - }, - (cb) => { - msd = new multistream.Dialer() - expect(msd).to.exist - msd.handle(dialerConn, cb) - } - ], next) - }, - (next) => { - dialerConn.cork() - listenerConn.pause() - - msl.addHandler('/monkey/1.0.0', (conn) => { - handled = true - conn.pipe(conn) - }) - next() - }, - (next) => { - msd.select('/monkey/1.0.0', (err, conn) => { - expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - expect(handled).to.be.eql(true) - next() - })) - conn.write('banana') - conn.end() + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + next() + }) + ) }) - setTimeout(() => { - expect(handled).to.be.eql(false) - dialerConn.uncork() - listenerConn.resume() - }, 100) } ], done) }) it('select non existing proto', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -157,9 +111,9 @@ describe('multistream normal mode', function () { }) it('ls', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -180,19 +134,19 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/giraffe/2.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, (next) => { msl.addHandler('/elephant/2.5.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() }, @@ -211,9 +165,9 @@ describe('multistream normal mode', function () { }) it('handler must be a function', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -233,20 +187,20 @@ describe('multistream normal mode', function () { ], next) }, (next) => { - try { - msd.addHandler('/monkey/1.0.0', 'potato') - } catch (err) { - expect(err).to.exist - next() - } + expect( + () => msl.addHandler('/monkey/1.0.0', 'potato') + ).to.throw( + /must be a function/ + ) + next() } ], done) }) it('racing condition resistent', (done) => { - const sp = streamPair.create() - const dialerConn = sp - const listenerConn = sp.other + const p = pair() + const dialerConn = p[0] + const listenerConn = p[1] let msl let msd @@ -262,7 +216,7 @@ describe('multistream normal mode', function () { }, (next) => { msl.addHandler('/monkey/1.0.0', (conn) => { - conn.pipe(conn) + pull(conn, conn) }) next() } @@ -274,13 +228,20 @@ describe('multistream normal mode', function () { expect(err).to.not.exist msd.select('/monkey/1.0.0', (err, conn) => { expect(err).to.not.exist - conn.pipe(bl((err, data) => { - expect(err).to.not.exist - expect(data.toString()).to.equal('banana') - cb() - })) - conn.write('banana') - conn.end() + + pull( + pull.values(['banana']), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist + expect( + data + ).to.be.eql( + ['banana'] + ) + cb() + }) + ) }) }) }