Skip to content

A transform stream that collects chunks and passes them on as batches.

License

Notifications You must be signed in to change notification settings

ovhemert/batch2

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

66 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

batch2

CI Codacy Badge Known Vulnerabilities Coverage Status js-standard-style

A transform stream that collects chunks and passes them on as batches.

Example

Let's say you want to stream documents from a MongoDB collection to ElasticSearch. For every document in the collection, the write stream will emit an event that you use to call ElasticSearch index function. That means that 100.000 documents will result in 100.000 API calls.

The ElasticSearch library has a function to bulk index, but since the stream emits a write for each document, we cannot group multiple index operations together.

The batch2 transform stream can help by buffering the chunks/docs and passing them on as batches. For example, we can now create batches of 500 docs each and reduce the number of API calls to ElasticSearch from 100.000 to 200, which will improve speed.

  mongoReadStream()
  .pipe(batch2({ size: 5 }) // transforms multiple chunks (mongo docs) to [chunk, chunk, chunk, chunk, chunk]
  .pipe(transformToElasticBulkOperation())
  .pipe(elasticWriteStream())

Installation

$ npm install batch2

API

batch2([options], [transformFunction])

Consult the stream.Transform documentation for the exact rules of the transformFunction (i.e. this._transform) and the optional flushFunction (i.e. this._flush).

options

The options argument is optional and is passed straight through to stream.Transform. So you can use objectMode:true if you are processing non-binary streams (or just use batch2.obj()).

transformFunction

The transformFunction must have the following signature: function (chunk, encoding, callback) {}. A minimal implementation should call the callback function to indicate that the transformation is done, even if that transformation means discarding the chunk.

To queue a new chunk, call this.push(chunk)—this can be called as many times as required before the callback() if you have multiple pieces to send on.

Alternatively, you may use callback(err, chunk) as shorthand for emitting a single chunk or an error.

If you do not provide a transformFunction then you will get a simple pass-through stream.

Maintainers

Osmond van Hemert Github Web

Contributing

If you would like to help out with some code, check the details.

Not a coder, but still want to support? Have a look at the options available to donate.

License

Licensed under MIT.