608B71FC-006A-4934-A643-7D9BA9340450Blog

Stream from anything

blog__author-img
Michał Czapracki
CEO at Scramjet, Data Streaming Expert.
23F2E8CD-3026-46A5-86CC-D13114F7176E425AE875-B1A1-4EA1-8529-075D08DA0BB1

5 September 2018

Streaming anything as a node.js stream

With the introduction of scramjet.from and a range of static from methods on all Scramjet stream classes we introduced a simple way to create streams from different sources. In the same way the stream is transformed it can generated quite as simply. Here's a piece on the ways to generate your chunks in Scramjet.

Let's start with basics:

From another stream

The idea behind from is to make sure our operand is a stream and if not, attempt to "streamify" it. In some older samples and references scramjet required the following creation:


_10
fs.createReadStream(filePath).pipe(new StringStream()).map(asyncMapper);
_10
// and some more operations.

This in most cases is quite sensible, but I found that the above code is so often repeated that it reminds me of Array.prototype.slice.call(something). So like Array.from, the scramjet.from method was created.

If we pass another stream to from it will:

  • First check if the stream is not of the same class as the context of from. If it is then there's no need to create a new stream - we just return what was passed. So for instance DataStream.from(new DataStream()) the argument already is a DataStream.
  • Otherwise if it's a stream, but it's of another class (let's say it's a stream created from fs.createReadStream) the SomeStream.from method will create a new SomeStream and will pipe the passed stream to the new one.

So now we can simply use:


_10
StringStream.from(fs.createReadStream(filePath)).map(asyncMapper);
_10
// and some more operations.

This way we can make sure we can use the scramjet goodness on any other stream.

From iterables

Another repeatable use case of scramjet is when there's a need to asynchronously iterate over an Array - a quite common starting point of a project:


_14
const countries = [
_14
"it.json",
_14
"pl.json",
_14
"uk.json",
_14
"fr.json",
_14
"de.json",
_14
"es.json"
_14
];
_14
_14
await (
_14
StringStream.from(countries)
_14
.parse(name => await loadFile(name))
_14
// and we get a stream of all contents
_14
);

It also works similarly on Iterators and iterables.

From generators

Oh, I did fall in love with those Async Generators and immediately thought - this is the best implementation of creating the chunks in some of my cases. For example reading some 4-bit digital values from GPIO to a stream, but not more often than every second:


_33
const gpio = require("rpi-gpio-promise");
_33
const sleep = require('sleep-promise');
_33
_33
DataStream
_33
.from(async function(init)* {
_33
await Promise.all([
_33
gpio.setup(init, gpio.DIR_IN),
_33
gpio.setup(init + 1, gpio.DIR_IN),
_33
gpio.setup(init + 2, gpio.DIR_IN),
_33
gpio.setup(init + 3, gpio.DIR_IN)
_33
]);
_33
_33
while (true) {
_33
yield await Promise.all([
_33
gpio.read(init)
_33
gpio.read(init + 1)
_33
gpio.read(init + 2)
_33
gpio.read(init + 3)
_33
]);
_33
await sleep(1000);
_33
}
_33
}, 6) // anything passed here will be passed to generator as initial arguments.
_33
.map(([a,b,c,d]) => ({value: a*8 + b*4 + c*2 + d}))
_33
.each(console.log)
_33
.run()
_33
;
_33
_33
// and we get:
_33
// 1
_33
// 3
_33
// 7
_33
// 4
_33
// and so on.

From functions

Sometimes, I found, that stream generators may depend on some synchronous or asynchronous logic, but I'd like the program to synchronously return a stream. I mean after all fs.createReadStream returns the stream before knowing if the file actually exists. And let's face it: we do know what we want be returned - a DataStream for instance.

This lead to introduction of passing functions to from:


_10
async function getData(base) {
_10
const length = await requestPromise.get(`${base}/count`);
_10
_10
if (length === 0) return [";no-entries"];
_10
else return request.get(`${base}/entries.csv`);
_10
}
_10
_10
DataStream.from(getData, "https://myapi.example.com") // arguments passed here will be passed to the function
_10
.pipe(fs.createWriteStream("log.txt"));

So we can already pipe the data to the log even though we don't know where we'll get the stream from in the end. Anything that the function returns will get passed to another from and the output from that will be piped to the stream that was returned in the beginning.

You can even return another function - like a redirect it will try to resolve it. Just keep in mind you can get to an infinite loop here.

From modules

Similarly to the above functions, passing a string to from will attempt to load a module. Let's consider these two files:


_10
/** @file lib/gen.js */
_10
_10
module.exports = (apiBase, key, secret) => {
_10
if (!valid(key, secret)) {
_10
throw new Error("Invalid key or secret");
_10
}
_10
_10
return streamData(apiBase, key, secret);
_10
};


_10
/** @file index.js */
_10
DataStream.from("lib/gen", process.env.API_BASE, process.env.API_KEY, process.env.API_SECRET).pipe(
_10
fs.createWriteStream(process.argv[2])
_10
);

In the second file we simply reference the relative location of the module (as we pass it to require). Scramjet will load the module and use it's module.exports as an argument for another from call, like in the case of a function.

Summary

I hope this was a good read. If you think there something to correct or add, please feel free to raise a new issue on GitHub.

Project co-financed by the European Union from the European Regional Development Fund under the Knowledge Education Development Program. The project is carried out as a part of the competition of the National for Research and Development: Szybka Ścieżka.