Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

Commit

Permalink
feat(pull): migration to pull-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 6, 2016
1 parent e5a48d0 commit b8de80d
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 225 deletions.
28 changes: 16 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "multistream-select",
"version": "0.10.0",
"description": "JavaScript implementation of the multistream spec",
"main": "lib/index.js",
"main": "src/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"lint": "aegir-lint",
Expand Down Expand Up @@ -41,24 +41,28 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"dependencies": {
"babel-runtime": "^6.6.1",
"length-prefixed-stream": "^1.5.0",
"lodash.range": "^3.1.5",
"run-series": "^1.1.4",
"varint": "^4.0.0"
"babel-runtime": "^6.11.6",
"debug": "^2.2.0",
"interface-connection": "^0.1.8",
"lodash.isfunction": "^3.0.8",
"lodash.range": "^3.1.7",
"pull-handshake": "^1.1.3",
"pull-length-prefixed": "^1.0.0",
"pull-stream": "^3.4.3",
"varint": "^4.0.1"
},
"devDependencies": {
"aegir": "^3.1.0",
"run-parallel": "^1.1.6",
"bl": "^1.1.2",
"aegir": "^6.0.0",
"chai": "^3.5.0",
"pre-commit": "^1.1.2",
"stream-pair": "^1.0.3"
"pre-commit": "^1.1.3",
"pull-pair": "^1.1.0",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"David Dias <mail@daviddias.me>",
"Richard Littauer <richard.littauer@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
77 changes: 77 additions & 0 deletions src/agreement.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict'

const handshake = require('pull-handshake')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:multistream:agreement')
log.error = debug('libp2p:multistream:agreement:error')

exports.dial = (header, cb) => {
const stream = handshake({timeout: 60 * 1000}, cb)
const shake = stream.handshake

log('writing header %s', header)
writeEncoded(shake, new Buffer(header + '\n'), cb)

lp.decodeFromReader(shake, (err, data) => {
if (err) return cb(err)
const protocol = data.toString().slice(0, -1)
if (protocol !== header) {
cb(new Error(`Unkown header: "${protocol}"`))
}

log('header ack')
cb(null, shake.rest())
})

return stream
}

exports.listen = (rawConn, handlersMap, defaultHandler) => {
const cb = (err) => {
// TODO: pass errors somewhere
log.error(err)
}
const stream = handshake({timeout: 60 * 1000}, cb)
const shake = stream.handshake

lp.decodeFromReader(shake, (err, data) => {
if (err) return cb(err)
log('received: %s', data.toString())
const protocol = data.toString().slice(0, -1)
const [key] = Object.keys(handlersMap).filter((id) => id === protocol)

if (key) {
log('ack: %s', protocol)
writeEncoded(shake, data, cb)
handlersMap[key](new Connection(shake.rest(), rawConn))
} else {
log('unkown protocol: %s', protocol)
defaultHandler(protocol, shake.rest())
}
})

return stream
}

function encode (msg, cb) {
const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)]

pull(
pull.values(values),
lp.encode(),
pull.collect((err, encoded) => {
if (err) return cb(err)
cb(null, encoded[0])
})
)
}

function writeEncoded (writer, msg, cb) {
encode(msg, (err, msg) => {
if (err) return cb(err)
writer.write(msg)
})
}
118 changes: 64 additions & 54 deletions src/dialer.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,88 @@
'use strict'

const lps = require('length-prefixed-stream')
const PROTOCOL_ID = require('./protocol-id')
const lp = require('pull-length-prefixed')
const varint = require('varint')
const range = require('lodash.range')
const series = require('run-series')
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:multistream:dialer')

exports = module.exports = Dialer
const PROTOCOL_ID = require('./protocol-id')
const agreement = require('./agreement')

function Dialer () {
if (!(this instanceof Dialer)) {
return new Dialer()
module.exports = class Dialer {
constructor () {
this.conn = null
}

const encode = lps.encode()
const decode = lps.decode()
let conn

// perform the multistream handshake
this.handle = (_conn, callback) => {
encode.pipe(_conn)
_conn.pipe(decode)

decode.once('data', (buffer) => {
const msg = buffer.toString().slice(0, -1)
if (msg === PROTOCOL_ID) {
encode.write(new Buffer(PROTOCOL_ID + '\n'))
conn = _conn
callback()
} else {
callback(new Error('Incompatible multistream'))
handle (rawConn, cb) {
log('handling connection')
const ms = agreement.dial(PROTOCOL_ID, (err, conn) => {
if (err) {
return cb(err)
}
log('handshake success')

this.conn = new Connection(conn, rawConn)

cb()
})
pull(rawConn, ms, rawConn)
}

this.select = (protocol, callback) => {
if (!conn) {
return callback(new Error('multistream handshake has not finalized yet'))
select (protocol, cb) {
log('selecting %s', protocol)
if (!this.conn) {
return cb(new Error('multistream handshake has not finalized yet'))
}

encode.write(new Buffer(protocol + '\n'))
decode.once('data', function (msgBuffer) {
const msg = msgBuffer.toString().slice(0, -1)
if (msg === protocol) {
return callback(null, conn)
}
if (msg === 'na') {
return callback(new Error(protocol + ' not supported'))
const stream = agreement.dial(protocol, (err, conn) => {
if (err) {
return cb(err)
}
// TODO: handle 'na'
cb(null, new Connection(conn, this.conn))
})

pull(this.conn, stream, this.conn)
}

this.ls = (callback) => {
encode.write(new Buffer('ls' + '\n'))
let protos = []
decode.once('data', function (msgBuffer) {
const size = varint.decode(msgBuffer) // eslint-disable-line
const nProtos = varint.decode(msgBuffer, varint.decode.bytes)

timesSeries(nProtos, (n, next) => {
decode.once('data', function (msgBuffer) {
protos.push(msgBuffer.toString().slice(0, -1))
next()
ls (cb) {
const ls = agreement.dial('ls', (err, conn) => {
if (err) return cb(err)

pull(
conn,
lp.decode(),
collectLs(conn),
pull.map(stringify),
pull.collect((err, list) => {
if (err) return cb(err)
return cb(null, list.slice(1))
})
}, (err) => {
if (err) {
return callback(err)
}
callback(null, protos)
})
)
})

pull(this.conn, ls, this.conn)
}
}

function timesSeries (i, work, callback) {
series(range(i).map((i) => (callback) => work(i, callback)), callback)
function stringify (buf) {
return buf.toString().slice(0, -1)
}

function collectLs (conn) {
let first = true
let counter = 0

return pull.take((msg) => {
if (first) {
const size = varint.decode(msg) // eslint-disable-line
counter = varint.decode(msg, varint.decode.bytes)
return true
}

return counter-- > 0
})
}
Loading

0 comments on commit b8de80d

Please sign in to comment.