Skip to content
This repository has been archived by the owner on Dec 27, 2019. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
brianc committed Aug 5, 2017
1 parent c9e21f4 commit e517b8c
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 127 deletions.
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
language: node_js
node_js:
- "0.10"
- "0.12"
- "4.2"
- "6"
- "7"
- "8"
env:
- PGUSER=postgres PGDATABASE=postgres
110 changes: 52 additions & 58 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,69 +1,63 @@
'use strict';
var util = require('util')
var Cursor = require('pg-cursor')
var Readable = require('readable-stream').Readable
var Readable = require('stream').Readable

var QueryStream = module.exports = function(text, values, options) {
var self = this
this._reading = false
this._closing = false
options = options || { }
Cursor.call(this, text, values)
Readable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 1000
})
this.batchSize = options.batchSize || 100
this.once('end', function() {
process.nextTick(function() {
self.emit('close')
})
})
}
class PgQueryStream extends Readable {
constructor(text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values)
this._reading = false
this._closed = false
this.batchSize = (options || { }).batchSize || 100

util.inherits(QueryStream, Readable)
// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor)
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor)
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor)
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor)
this.handleError = this.cursor.handleError.bind(this.cursor)
}

//copy cursor prototype to QueryStream
//so we can handle all the events emitted by the connection
for(var key in Cursor.prototype) {
if(key == 'read') {
QueryStream.prototype._fetch = Cursor.prototype.read
} else {
QueryStream.prototype[key] = Cursor.prototype[key]
submit(connection) {
this.cursor.submit(connection)
return this
}
}

QueryStream.prototype.close = function(cb) {
this._closing = true
var self = this
Cursor.prototype.close.call(this, function(err) {
if (cb) { cb(err); }
if(err) return self.emit('error', err)
process.nextTick(function() {
self.push(null)
})
})
}
close(callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
}

QueryStream.prototype._read = function(n) {
if(this._reading || this._closing) return false
this._reading = true
var self = this
this._fetch(this.batchSize, function(err, rows) {
if(err) {
return self.emit('error', err)
_read(size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

if (self._closing) { return; }

if(!rows.length) {
process.nextTick(function() {
self.push(null)
})
return
}
self._reading = false
for(var i = 0; i < rows.length; i++) {
self.push(rows[i])
}
})
// push each row into the stream
this._reading = false
for(var i = 0; i < rows.length; i++) {
this.push(rows[i])
}
})
}
}

module.exports = PgQueryStream
17 changes: 8 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Postgres query result returned as readable stream",
"main": "index.js",
"scripts": {
"test": "mocha test/*.js -R spec"
"test": "mocha"
},
"repository": {
"type": "git",
Expand All @@ -17,21 +17,20 @@
"stream"
],
"author": "Brian M. Carlson",
"license": "BSD-2-Clause",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-pg-query-stream/issues"
},
"devDependencies": {
"pg.js": "*",
"JSONStream": "~0.7.1",
"concat-stream": "~1.0.1",
"through": "~2.3.4",
"stream-tester": "0.0.5",
"mocha": "^3.5.0",
"pg": "6.x",
"stream-spec": "~0.3.5",
"JSONStream": "~0.7.1",
"mocha": "~1.17.1"
"stream-tester": "0.0.5",
"through": "~2.3.4"
},
"dependencies": {
"pg-cursor": "1.0.0",
"readable-stream": "^2.0.4"
"pg-cursor": "1.2.1"
}
}
46 changes: 0 additions & 46 deletions test/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,52 +34,6 @@ helper('early close', function(client) {
})
})

helper('should not throw errors after early close', function(client) {
it('can be closed early without error', function(done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 2000) num');
var query = client.query(stream);
var fetchCount = 0;
var errorCount = 0;


function waitForErrors() {

setTimeout(function () {
assert(errorCount === 0, 'should not throw a ton of errors');
done();
}, 10);
}

// hack internal _fetch function to force query.close immediately after _fetch is called (simulating the race condition)
// race condition: if close is called immediately after _fetch is called, but before results are returned, errors are thrown
// when the fetch results are pushed to the readable stream after its already closed.
query._fetch = (function (_fetch) {
return function () {

// wait for the second fetch. closing immediately after the first fetch throws an entirely different error :(
if (fetchCount++ === 0) {
return _fetch.apply(this, arguments);
}

var results = _fetch.apply(this, arguments);

query.close();
waitForErrors();

query._fetch = _fetch; // we're done with our hack, so restore the original _fetch function.

return results;
}
}(query._fetch));

query.on('error', function () { errorCount++; });

query.on('readable', function () {
query.read();
});
});
});

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2});
Expand Down
2 changes: 1 addition & 1 deletion test/helper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var pg = require('pg.js')
var pg = require('pg')
module.exports = function(name, cb) {
describe(name, function() {
var client = new pg.Client()
Expand Down
2 changes: 1 addition & 1 deletion test/issue-3.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var pg = require('pg.js')
var pg = require('pg')
var QueryStream = require('../')
describe('end semantics race condition', function() {
before(function(done) {
Expand Down
1 change: 1 addition & 0 deletions test/mocha.opts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
--no-exit
--bail
18 changes: 9 additions & 9 deletions test/stream-tester-timestamp.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var pg = require('pg.js')
var pg = require('pg')
var QueryStream = require('../')
var spec = require('stream-spec')
var assert = require('assert')
Expand All @@ -10,20 +10,20 @@ require('./helper')('stream tester timestamp', function(client) {
var stream = new QueryStream(sql, [])
var ended = false
var query = client.query(stream)
query.
on('end', function() { ended = true })
query.on('end', function() { ended = true })
spec(query)
.readable()
.pausable({strict: true})
.validateOnExit()
;
.pausable({ strict: true })
.validateOnExit();
var checkListeners = function() {
assert(stream.listeners('end').length < 10)
if (!ended)
if (!ended) {
setImmediate(checkListeners)
else
}
else {
done()
}
}
checkListeners()
})
})
})

0 comments on commit e517b8c

Please sign in to comment.