From 119a0ff3e161b305feaca18cdbd89b66f3422c30 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 6 Jan 2026 10:17:57 +0100 Subject: [PATCH] stream: add SyncTransform Add SyncTransform, a high-performance synchronous transform stream based on the syncthrough npm package. Unlike the standard Transform stream, SyncTransform uses a synchronous transformation function (no callback), which results in significantly better performance. Key features: - Synchronous transform function for ~10x better performance - Strict backpressure enforcement without internal buffering - Works with pipeline(), pipe(), and 'data' events - Supports object mode automatically - Includes flush function for final data emission PR-URL: https://github.com/nodejs/node/pull/XXXXX --- doc/api/stream.md | 169 +++++ lib/internal/streams/synctransform.js | 256 ++++++++ lib/stream.js | 1 + test/parallel/test-stream-synctransform.js | 719 +++++++++++++++++++++ 4 files changed, 1145 insertions(+) create mode 100644 lib/internal/streams/synctransform.js create mode 100644 test/parallel/test-stream-synctransform.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 2a4268900966f4..1fb64d98aecddb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -4771,6 +4771,175 @@ stream that simply passes the input bytes across to the output. Its purpose is primarily for examples and testing, but there are some use cases where `stream.PassThrough` is useful as a building block for novel sorts of streams. +#### Class: `stream.SyncTransform` + + + +> Stability: 1 - Experimental + +The `stream.SyncTransform` class is a high-performance synchronous transform +stream that enforces backpressure without internal buffering. Unlike the +standard [`Transform`][] stream, the transformation function is synchronous +(no callback), which results in significantly better performance (up to 10x +faster). + +```js +const { SyncTransform, pipeline } = require('node:stream'); +const { createReadStream } = require('node:fs'); + +pipeline( + createReadStream('input.txt'), + SyncTransform((chunk) => { + return chunk.toString().toUpperCase(); + }), + process.stdout, + (err) => { + if (err) console.error(err); + } +); +``` + +##### `new stream.SyncTransform([transform[, flush]])` + + + +* `transform` {Function} The synchronous transformation function. + * `chunk` {any} The chunk to be transformed. + * Returns: {any} The transformed chunk. Return `null` to end the stream, + or `undefined` to skip this chunk. +* `flush` {Function} Optional function called before the stream ends. + * Returns: {any} Optional final chunk to write before ending. + +Creates a new `SyncTransform` stream. Can be called with or without `new`. + +```js +const { SyncTransform } = require('node:stream'); + +// Using as a function +const upper = SyncTransform((chunk) => chunk.toString().toUpperCase()); + +// Using with new +const lower = new SyncTransform((chunk) => chunk.toString().toLowerCase()); + +// With flush +const withTrailer = SyncTransform( + (chunk) => chunk, + () => Buffer.from('END') +); +``` + +##### `syncTransform.pipe(destination[, options])` + + + +* `destination` {stream.Writable} The destination for writing data. +* `options` {Object} + * `end` {boolean} End the destination when the source ends. **Default:** `true`. +* Returns: {stream.Writable} The destination stream. + +Pipes data to the destination. Unlike standard streams, `SyncTransform` only +allows piping to a single destination. Attempting to pipe to multiple +destinations will throw an error. + +##### `syncTransform.unpipe(destination)` + + + +* `destination` {stream.Writable} The destination to unpipe. +* Returns: {stream.SyncTransform} Returns `this`. + +Removes the pipe to the specified destination. + +##### `syncTransform.write(chunk)` + + + +* `chunk` {any} The data to write. +* Returns: {boolean} `false` if backpressure should be applied, `true` otherwise. + +Writes data to the stream. The transform function is called synchronously +with the chunk. If no destination is piped, the transformed data is held +until a destination is available (backpressure is enforced). + +##### `syncTransform.push(chunk)` + + + +* `chunk` {any} The data to push to the destination. +* Returns: {stream.SyncTransform} Returns `this`. + +Pushes a chunk directly to the destination. Can be called multiple times +within the transform function to output multiple chunks from a single input. + +```js +const { SyncTransform } = require('node:stream'); + +const duplicator = SyncTransform(function(chunk) { + this.push(chunk); + this.push(chunk); + // Return undefined to skip the normal return path +}); +``` + +##### `syncTransform.end([chunk])` + + + +* `chunk` {any} Optional final chunk to write. +* Returns: {stream.SyncTransform} Returns `this`. + +Signals the end of the writable side of the stream. If a flush function was +provided, it is called and its return value (if any) is written to the +destination before ending. + +##### `syncTransform.destroy([error])` + + + +* `error` {Error} Optional error to emit. +* Returns: {stream.SyncTransform} Returns `this`. + +Destroys the stream. If an error is provided, an `'error'` event is emitted. +A `'close'` event is always emitted. + +##### Caveats + +The `SyncTransform` stream has some important differences from standard +streams: + +1. **Strict backpressure**: Writing to a `SyncTransform` before it's piped + will buffer exactly one chunk. Writing again before piping or before the + destination is ready will emit an error. + +2. **Single destination**: Only one `pipe()` destination is allowed. Attempting + to pipe to multiple destinations throws an error. + +3. **No `readable` event**: Data is pushed directly to the destination when + available, rather than being buffered for pull-based consumption. + +4. **Either `pipe()` or `'data'` event**: You can use `pipe()` or listen to + `'data'` events, but not both. Adding a `'data'` listener after calling + `pipe()` throws an error. + +5. **Object mode by default**: Unlike standard streams, `SyncTransform` + automatically handles objects without requiring explicit configuration. + ## Additional notes diff --git a/lib/internal/streams/synctransform.js b/lib/internal/streams/synctransform.js new file mode 100644 index 00000000000000..e9d168c2b2500f --- /dev/null +++ b/lib/internal/streams/synctransform.js @@ -0,0 +1,256 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// A synchronous transform stream. +// Passes data through synchronously with optional transformation. +// Enforces backpressure without internal buffering for high performance. + +'use strict'; + +const { + FunctionPrototypeCall, + ObjectSetPrototypeOf, +} = primordials; + +const { EventEmitter } = require('events'); + +const { + codes: { + ERR_STREAM_WRITE_AFTER_END, + }, +} = require('internal/errors'); + +const { validateFunction } = require('internal/validators'); + +const kDestination = Symbol('kDestination'); +const kInFlight = Symbol('kInFlight'); +const kTransform = Symbol('kTransform'); +const kFlush = Symbol('kFlush'); +const kDestinationNeedsEnd = Symbol('kDestinationNeedsEnd'); +const kLastPush = Symbol('kLastPush'); +const kEndEmitted = Symbol('kEndEmitted'); +const kDestroyed = Symbol('kDestroyed'); + +function passthrough(chunk) { + return chunk; +} + +function SyncTransform(transform, flush) { + if (!(this instanceof SyncTransform)) + return new SyncTransform(transform, flush); + + if (transform !== undefined && transform !== null) { + validateFunction(transform, 'transform'); + } + if (flush !== undefined && flush !== null) { + validateFunction(flush, 'flush'); + } + + FunctionPrototypeCall(EventEmitter, this); + + this[kTransform] = transform || passthrough; + this[kFlush] = flush || passthrough; + this[kDestination] = null; + this[kInFlight] = undefined; + this.writable = true; + this[kEndEmitted] = false; + this[kDestinationNeedsEnd] = true; + this[kLastPush] = true; + this[kDestroyed] = false; + + this.on('newListener', onNewListener); + this.on('removeListener', onRemoveListener); + this.on('end', onEnd); +} + +ObjectSetPrototypeOf(SyncTransform.prototype, EventEmitter.prototype); +ObjectSetPrototypeOf(SyncTransform, EventEmitter); + +function onNewListener(ev) { + if (ev === 'data') { + if (this[kDestination] && !(this[kDestination] instanceof OnData)) { + throw new Error('you can use only pipe() or on(\'data\')'); + } + process.nextTick(deferPiping, this); + } +} + +function deferPiping(that) { + if (that[kDestination] && that[kDestination] instanceof OnData) { + // Nothing to do, piping was deferred twice for on('data'). + return; + } + + that.pipe(new OnData(that)); + if (!that.writable && !that[kEndEmitted]) { + that.emit('end'); + } +} + +function onRemoveListener(ev) { + if (ev === 'data' && + ((ev.listenerCount && ev.listenerCount(this, ev)) !== 0)) { + this.unpipe(this[kDestination]); + } +} + +function onEnd() { + this[kEndEmitted] = true; +} + +SyncTransform.prototype.pipe = function(dest, opts) { + const that = this; + const inFlight = this[kInFlight]; + + if (this[kDestination]) { + throw new Error('multiple pipe not allowed'); + } + this[kDestination] = dest; + + dest.emit('pipe', this); + + this[kDestination].on('drain', function() { + that.emit('drain'); + }); + + this[kDestination].on('end', function() { + that.end(); + }); + + this[kDestinationNeedsEnd] = !opts || opts.end !== false; + + if (inFlight && this[kDestination].write(inFlight)) { + this[kInFlight] = undefined; + this.emit('drain'); + } else if (inFlight === null) { + doEnd(this); + } + + return dest; +}; + +SyncTransform.prototype.unpipe = function(dest) { + if (!this[kDestination] || this[kDestination] !== dest) { + return this; + } + + this[kDestination] = null; + + dest.emit('unpipe', this); + + return this; +}; + +SyncTransform.prototype.write = function(chunk) { + if (!this.writable) { + this.emit('error', new ERR_STREAM_WRITE_AFTER_END()); + return false; + } + + const res = this[kTransform](chunk); + + if (!this[kDestination]) { + if (this[kInFlight]) { + this.emit('error', new Error('upstream must respect backpressure')); + return false; + } + this[kInFlight] = res; + return false; + } + + if (res) { + this[kLastPush] = this[kDestination].write(res); + } else if (res === null) { + doEnd(this); + return false; + } + + return this[kLastPush]; +}; + +SyncTransform.prototype.push = function(chunk) { + // Ignoring the return value. + this[kLastPush] = this[kDestination].write(chunk); + return this; +}; + +SyncTransform.prototype.end = function(chunk) { + if (chunk) { + this.write(chunk); // Errors if we are after EOF. + } + + doEnd(this); + + return this; +}; + +function doEnd(that) { + if (that.writable) { + that.writable = false; + if (that[kDestination]) { + that[kEndEmitted] = true; + const toFlush = that[kFlush]() || null; + if (that[kDestinationNeedsEnd]) { + that[kDestination].end(toFlush); + } else if (toFlush !== null) { + that[kDestination].write(toFlush); + } + that.emit('end'); + } + } +} + +SyncTransform.prototype.destroy = function(err) { + if (!this[kDestroyed]) { + this[kDestroyed] = true; + + process.nextTick(doDestroy, this, err); + } + + return this; +}; + +function doDestroy(that, err) { + if (err) { + that.emit('error', err); + } + that.emit('close'); +} + +// Internal class for handling on('data') events. +function OnData(parent) { + this.parent = parent; + FunctionPrototypeCall(EventEmitter, this); +} + +ObjectSetPrototypeOf(OnData.prototype, EventEmitter.prototype); +ObjectSetPrototypeOf(OnData, EventEmitter); + +OnData.prototype.write = function(chunk) { + this.parent.emit('data', chunk); + return true; +}; + +OnData.prototype.end = function() { + // Intentionally empty. +}; + +module.exports = SyncTransform; diff --git a/lib/stream.js b/lib/stream.js index a26cc0b81c557e..59832378250f6d 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -105,6 +105,7 @@ Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); +Stream.SyncTransform = require('internal/streams/synctransform'); Stream.duplexPair = require('internal/streams/duplexpair'); Stream.pipeline = pipeline; const { addAbortSignal } = require('internal/streams/add-abort-signal'); diff --git a/test/parallel/test-stream-synctransform.js b/test/parallel/test-stream-synctransform.js new file mode 100644 index 00000000000000..ad485a172a0017 --- /dev/null +++ b/test/parallel/test-stream-synctransform.js @@ -0,0 +1,719 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { test } = require('node:test'); +const { + Readable, + Writable, + SyncTransform, + pipeline, +} = require('stream'); +const fs = require('fs'); + +// Helper functions +function stringFrom(chunks) { + return new Readable({ + read() { + this.push(chunks.shift() || null); + }, + }); +} + +function stringSink(expected, callback) { + return new Writable({ + write(chunk, enc, cb) { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + cb(); + }, + final(cb) { + if (callback) callback(); + cb(); + }, + }); +} + +function delayedStringSink(expected, callback) { + return new Writable({ + highWaterMark: 2, + write(chunk, enc, cb) { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + setImmediate(cb); + }, + final(cb) { + if (callback) callback(); + cb(); + }, + }); +} + +function objectFrom(chunks) { + return new Readable({ + objectMode: true, + read() { + this.push(chunks.shift() || null); + }, + }); +} + +function objectSink(expected, callback) { + return new Writable({ + objectMode: true, + write(chunk, enc, cb) { + assert.deepStrictEqual(chunk, expected.shift()); + cb(); + }, + final(cb) { + if (callback) callback(); + cb(); + }, + }); +} + +test('SyncTransform - pipe', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + const sink = stringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - multiple pipe', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + const stream2 = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toLowerCase()); + }); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('foo'), Buffer.from('bar')]; + + await new Promise((resolve) => { + const sink = stringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(stream2).pipe(sink); + }); +}); + +test('SyncTransform - backpressure', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + const sink = delayedStringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - multiple pipe with backpressure', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + const stream2 = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toLowerCase()); + }); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar'), Buffer.from('baz')]); + const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('baz')]; + + await new Promise((resolve) => { + const sink = delayedStringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(stream2).pipe(sink); + }); +}); + +test('SyncTransform - objects', async () => { + const stream = SyncTransform(function(chunk) { + return { chunk }; + }); + const from = objectFrom([{ name: 'matteo' }, { answer: 42 }]); + const expected = [{ chunk: { name: 'matteo' } }, { chunk: { answer: 42 } }]; + + await new Promise((resolve) => { + const sink = objectSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - pipe event', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + stream.on('pipe', common.mustCall((s) => { + assert.strictEqual(s, from); + })); + + await new Promise((resolve) => { + const sink = stringSink(expected, resolve); + sink.on('pipe', common.mustCall((s) => { + assert.strictEqual(s, stream); + })); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - unpipe event', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = new Readable({ read() { } }); + const expected = [Buffer.from('FOO')]; + + await new Promise((resolve) => { + const sink = stringSink(expected); + sink.on('unpipe', common.mustCall((s) => { + assert.strictEqual(s, stream); + resolve(); + })); + + from.pipe(stream).pipe(sink); + from.push(Buffer.from('foo')); + process.nextTick(() => { + stream.unpipe(sink); + from.push(Buffer.from('bar')); + }); + }); +}); + +test('SyncTransform - data event', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + stream.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + }, 2)); + + stream.on('end', common.mustCall(resolve)); + from.pipe(stream); + }); +}); + +test('SyncTransform - end event during pipe', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + stream.on('end', common.mustCall(resolve)); + const sink = stringSink(expected); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - end()', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const expected = [Buffer.from('FOO')]; + + await new Promise((resolve) => { + stream.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + })); + + stream.on('end', common.mustCall(resolve)); + stream.end(Buffer.from('foo')); + }); +}); + +test('SyncTransform - on(\'data\') after end()', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const expected = [Buffer.from('FOO')]; + + stream.end(Buffer.from('foo')); + + await new Promise((resolve) => { + stream.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + })); + + stream.on('end', common.mustCall(resolve)); + }); +}); + +test('SyncTransform - double end()', async () => { + const stream = SyncTransform(); + stream.end('hello'); + + await new Promise((resolve) => { + stream.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + resolve(); + })); + stream.end('world'); + }); +}); + +test('SyncTransform - uppercase a file with on(\'data\')', async () => { + let str = ''; + let expected = ''; + + const stream = SyncTransform(function(chunk) { + return chunk.toString().toUpperCase(); + }); + + const fromPath = __filename; + + await new Promise((resolve) => { + stream.on('data', (chunk) => { + str += chunk; + }); + + const from = fs.createReadStream(fromPath); + from.pipe(new Writable({ + write(chunk, enc, cb) { + expected += chunk.toString().toUpperCase(); + cb(); + }, + })).on('finish', () => { + assert.strictEqual(str, expected); + resolve(); + }); + from.pipe(stream); + }); +}); + +test('SyncTransform - uppercase a file with pipe()', async () => { + let str = ''; + let expected = ''; + + const stream = SyncTransform(function(chunk) { + return chunk.toString().toUpperCase(); + }); + + stream.pipe(new Writable({ + objectMode: true, + write(chunk, enc, cb) { + str += chunk; + cb(); + }, + })); + + const fromPath = __filename; + + await new Promise((resolve) => { + const from = fs.createReadStream(fromPath); + from.pipe(new Writable({ + write(chunk, enc, cb) { + expected += chunk.toString().toUpperCase(); + cb(); + }, + })).on('finish', () => { + assert.strictEqual(str, expected); + resolve(); + }); + + from.pipe(stream); + }); +}); + +test('SyncTransform - destroy()', async () => { + const stream = SyncTransform(); + stream.destroy(); + + await new Promise((resolve) => { + stream.on('close', common.mustCall(resolve)); + }); +}); + +test('SyncTransform - destroy(err)', async () => { + const stream = SyncTransform(); + stream.destroy(new Error('kaboom')); + + await new Promise((resolve) => { + stream.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + resolve(); + })); + }); +}); + +test('SyncTransform - works with pipeline', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + const stream2 = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toLowerCase()); + }); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('foo'), Buffer.from('bar')]; + + await new Promise((resolve, reject) => { + const sink = stringSink(expected); + pipeline(from, stream, stream2, sink, (err) => { + if (err) reject(err); + else resolve(); + }); + }); +}); + +test('SyncTransform - works with pipeline and handles errors', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + stream.on('close', common.mustCall()); + + const stream2 = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toLowerCase()); + }); + + stream2.on('close', common.mustCall()); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const sink = new Writable({ + write(chunk, enc, cb) { + cb(new Error('kaboom')); + }, + }); + + await new Promise((resolve) => { + pipeline(from, stream, stream2, sink, (err) => { + assert.ok(err); + resolve(); + }); + }); +}); + +test('SyncTransform - avoid ending pipe destination if { end: false }', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + const sink = stringSink(expected); + sink.on('finish', common.mustNotCall()); + + from.pipe(stream).pipe(sink, { end: false }); + + stream.on('end', () => { + setImmediate(resolve); + }); + }); +}); + +test('SyncTransform - this.push', async () => { + const stream = SyncTransform(function(chunk) { + this.push(Buffer.from(chunk.toString().toUpperCase())); + this.push(Buffer.from(chunk.toString())); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('foo'), Buffer.from('BAR'), Buffer.from('bar')]; + + await new Promise((resolve) => { + const sink = stringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - this.push objects', async () => { + const stream = SyncTransform(function(chunks) { + return chunks; + }); + const from = objectFrom([{ num: 1 }, { num: 2 }, { num: 3 }, { num: 4 }, { num: 5 }, { num: 6 }]); + const expected = [{ num: 1 }, { num: 2 }, { num: 3 }, { num: 4 }, { num: 5 }, { num: 6 }]; + + await new Promise((resolve) => { + const sink = objectSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - backpressure with push', async () => { + let wait = false; + + const stream = SyncTransform(function(chunk) { + assert.strictEqual(wait, false); + wait = true; + this.push(Buffer.from(chunk.toString().toUpperCase())); + this.push(Buffer.from(chunk.toString())); + setImmediate(() => { + wait = false; + }); + }); + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('foo'), Buffer.from('BAR'), Buffer.from('bar')]; + + await new Promise((resolve) => { + const sink = delayedStringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - returning null ends the stream', async () => { + const stream = SyncTransform(function() { + return null; + }); + + stream.on('data', common.mustNotCall()); + + await new Promise((resolve) => { + stream.on('end', common.mustCall(resolve)); + stream.write(Buffer.from('foo')); + }); +}); + +test('SyncTransform - returning null ends the stream deferred', async () => { + const stream = SyncTransform(function() { + return null; + }); + + stream.on('data', common.mustNotCall()); + + await new Promise((resolve) => { + stream.on('end', common.mustCall(resolve)); + + setImmediate(() => { + stream.write(Buffer.from('foo')); + }); + }); +}); + +test('SyncTransform - returning null ends the stream when piped', async () => { + const stream = SyncTransform(function() { + return null; + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + + await new Promise((resolve) => { + const sink = stringSink([]); + sink.on('finish', common.mustCall(resolve)); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - support flush', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }, function() { + return Buffer.from('done!'); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected = [Buffer.from('FOO'), Buffer.from('BAR'), Buffer.from('done!')]; + + await new Promise((resolve) => { + const sink = stringSink(expected, resolve); + sink.on('finish', common.mustCall()); + from.pipe(stream).pipe(sink); + }); +}); + +test('SyncTransform - adding on(\'data\') after pipe throws', () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + + const sink = new Writable(); + + stream.pipe(sink); + + assert.throws(() => { + stream.on('data', () => {}); + }, /you can use only pipe\(\) or on\('data'\)/); +}); + +test('SyncTransform - multiple data event', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]); + const expected1 = [Buffer.from('FOO'), Buffer.from('BAR')]; + const expected2 = [Buffer.from('FOO'), Buffer.from('BAR')]; + + await new Promise((resolve) => { + let count = 0; + const done = () => { + count++; + if (count === 4) resolve(); + }; + + stream.on('data', (chunk) => { + assert.strictEqual(chunk.toString(), expected1.shift().toString()); + done(); + }); + + stream.on('data', (chunk) => { + assert.strictEqual(chunk.toString(), expected2.shift().toString()); + done(); + }); + + from.pipe(stream); + }); +}); + +test('SyncTransform - piping twice errors', () => { + const stream = SyncTransform(); + stream.pipe(new Writable()); + + assert.throws(() => { + stream.pipe(new Writable()); + }, /multiple pipe not allowed/); +}); + +test('SyncTransform - removing on(\'data\') handlers', async () => { + const stream = SyncTransform(function(chunk) { + return Buffer.from(chunk.toString().toUpperCase()); + }); + const expected = [Buffer.from('FOO'), Buffer.from('BAR')]; + + function first(chunk) { + assert.strictEqual(chunk.toString(), expected.shift().toString()); + } + + function second() { + assert.fail('should never be called'); + } + + stream.on('data', first); + stream.on('data', second); + + stream.removeListener('data', second); + + await new Promise((resolve) => { + stream.write('foo'); + + stream.once('drain', () => { + stream.removeListener('data', first); + stream.on('data', first); + stream.write('bar'); + resolve(); + }); + }); +}); + +test('SyncTransform - double unpipe does nothing', () => { + const stream = SyncTransform(); + const dest = new Writable(); + + stream.pipe(dest); + stream.unpipe(dest); + stream.unpipe(dest); + + stream.write('hello'); +}); + +test('SyncTransform - must respect backpressure', async () => { + const stream = SyncTransform(); + + assert.strictEqual(stream.write('hello'), false); + + await new Promise((resolve) => { + stream.once('error', common.mustCall(() => { + resolve(); + })); + + assert.strictEqual(stream.write('world'), false); + }); +}); + +test('SyncTransform - works with pipeline and calls flush', async () => { + const expected = 'hello world!'; + let actual = ''; + + await new Promise((resolve, reject) => { + pipeline( + Readable.from('hello world'), + SyncTransform( + undefined, + function flush() { + this.push('!'); + } + ), + new Writable({ + write(chunk, enc, cb) { + actual += chunk.toString(); + cb(); + }, + }), + (err) => { + if (err) return reject(err); + assert.strictEqual(actual, expected); + resolve(); + } + ); + }); +}); + +test('SyncTransform - works with pipeline and calls flush / 2', async () => { + const expected = 'hello world!'; + let actual = ''; + + await new Promise((resolve, reject) => { + pipeline( + Readable.from('hello world'), + SyncTransform( + undefined, + function flush() { + return '!'; + } + ), + new Writable({ + write(chunk, enc, cb) { + actual += chunk.toString(); + cb(); + }, + }), + (err) => { + if (err) return reject(err); + assert.strictEqual(actual, expected); + resolve(); + } + ); + }); +}); + +test('SyncTransform - can be called without new', () => { + const stream = SyncTransform(); + assert.ok(stream instanceof SyncTransform); +}); + +test('SyncTransform - validates transform function', () => { + assert.throws(() => { + SyncTransform('not a function'); + }, { + code: 'ERR_INVALID_ARG_TYPE', + }); +}); + +test('SyncTransform - validates flush function', () => { + assert.throws(() => { + SyncTransform(null, 'not a function'); + }, { + code: 'ERR_INVALID_ARG_TYPE', + }); +});