Skip to content

Commit

Permalink
fix: properly handle asynchronous read from stream (#1284)
Browse files Browse the repository at this point in the history
* test: test ReadRows logic with local gRPC server

* test: PR feedback

* test: fix race condition in initialization

* test: PR feedback, renaming a variable for readability

* fix: properly handle asynchronous read from stream

* test: skip failing Windows test

* test: increase timeout on Windows

* fix: PR feedback

* test: only set lastScannedRowKey for completed rows

* fix: bring back the lastScannedRowKey logic

* test: pick latest changes from main branch

* fix: add transform method to userStream, handle cancellation in it

* fix: PR feedback

---------

Co-authored-by: danieljbruce <[email protected]>
  • Loading branch information
alexander-fenster and danieljbruce committed May 30, 2023
1 parent f953911 commit 55d86ba
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 22 deletions.
16 changes: 13 additions & 3 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ export class ChunkTransformer extends Transform {
* @public
*
* @param {object} data readrows response containing array of chunks.
* @param {object} [enc] encoding options.
* @param {object} [_encoding] encoding options.
* @param {callback} next callback will be called once data is processed, with error if any error in processing
*/
_transform(data: Data, enc: string, next: Function): void {
_transform(data: Data, _encoding: string, next: Function): void {
for (const chunk of data.chunks!) {
switch (this.state) {
case RowStateEnum.NEW_ROW:
Expand All @@ -148,6 +148,7 @@ export class ChunkTransformer extends Transform {
break;
}
if (this._destroyed) {
next();
return;
}
}
Expand Down Expand Up @@ -226,7 +227,16 @@ export class ChunkTransformer extends Transform {
chunk.familyName ||
chunk.qualifier ||
(chunk.value && chunk.value.length !== 0) ||
chunk.timestampMicros! > 0;
// timestampMicros is an int64 in the protobuf definition,
// which can be either a number or an instance of Long.
// If it's a number...
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// If it's an instance of Long...
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
if (chunk.resetRow && containsData) {
this.destroy(
new TransformError({
Expand Down
49 changes: 39 additions & 10 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,22 +745,51 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
filter = Filter.parse(options.filter);
}

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

let userCanceled = false;
const userStream = new PassThrough({
objectMode: true,
transform(row, _encoding, callback) {
if (userCanceled) {
callback();
return;
}
callback(null, row);
},
});

// The caller should be able to call userStream.end() to stop receiving
// more rows and cancel the stream prematurely. But also, the 'end' event
// will be emitted if the stream ended normally. To tell these two
// situations apart, we'll save the "original" end() function, and
// will call it on rowStream.on('end').
const originalEnd = userStream.end.bind(userStream);

// Taking care of this extra listener when piping and unpiping userStream:
const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => {
rowStream.pipe(userStream, {end: false});
rowStream.on('end', originalEnd);
};
const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => {
rowStream?.unpipe(userStream);
rowStream?.removeListener('end', originalEnd);
};

// eslint-disable-next-line @typescript-eslint/no-explicit-any
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => {
rowStreamUnpipe(rowStream, userStream);
userCanceled = true;
if (activeRequestStream) {
activeRequestStream.abort();
}
if (retryTimer) {
clearTimeout(retryTimer);
}
return end();
return originalEnd(chunk, encoding, cb);
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
Expand Down Expand Up @@ -882,7 +911,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const toRowStream = new Transform({
transform: (rowData, _, next) => {
if (
chunkTransformer._destroyed ||
userCanceled ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(userStream as any)._writableState.ended
) {
Expand Down Expand Up @@ -913,7 +942,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

rowStream
.on('error', (error: ServiceError) => {
rowStream.unpipe(userStream);
rowStreamUnpipe(rowStream, userStream);
activeRequestStream = null;
if (IGNORED_STATUS_CODES.has(error.code)) {
// We ignore the `cancelled` "error", since we are the ones who cause
Expand Down Expand Up @@ -947,7 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
.on('end', () => {
activeRequestStream = null;
});
rowStream.pipe(userStream);
rowStreamPipe(rowStream, userStream);
};

makeNewRequest();
Expand Down
6 changes: 0 additions & 6 deletions test/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,12 +997,6 @@ describe('Bigtable/ChunkTransformer', () => {
const err = callback.getCall(0).args[0];
assert(!err, 'did not expect error');
});
it('should return when stream is destroyed', () => {
chunkTransformer._destroyed = true;
const chunks = [{key: 'key'}];
chunkTransformer._transform({chunks}, {}, callback);
assert(!callback.called, 'unexpected call to next');
});
it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => {
chunkTransformer._transform(
{chunks: [], lastScannedRowKey: 'foo'},
Expand Down
5 changes: 2 additions & 3 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ describe('Bigtable/ReadRows', () => {
pipeline(readStream, transform, passThrough, () => {});
});

// TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed
it.skip('should create read stream and read asynchronously using Transform stream', function (done) {
it('should create read stream and read asynchronously using Transform stream', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
}
Expand Down Expand Up @@ -222,7 +221,7 @@ describe('Bigtable/ReadRows', () => {
});
});

// TODO(@alexander-fenster): enable after it's fixed
// TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed
it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
Expand Down

0 comments on commit 55d86ba

Please sign in to comment.