With the introduction of scramjet.from
and a range of static from
methods on all Scramjet stream classes we introduced a simple way to create streams from different sources. In the same way the stream is transformed it can generated quite as simply. Here's a piece on the ways to generate your chunks in Scramjet.
Let's start with basics:
The idea behind from
is to make sure our operand is a stream and if not, attempt to "streamify" it. In some older samples and references scramjet required the following creation:
_10fs.createReadStream(filePath).pipe(new StringStream()).map(asyncMapper);_10// and some more operations.
This in most cases is quite sensible, but I found that the above code is so often repeated that it reminds me of Array.prototype.slice.call(something)
. So like Array.from
, the scramjet.from
method was created.
If we pass another stream to from
it will:
from
. If it is then there's no need to create a new stream - we just return what was passed. So for instance DataStream.from(new DataStream())
the argument already is a DataStream
.fs.createReadStream
) the SomeStream.from
method will create a new SomeStream
and will pipe the passed stream to the new one.So now we can simply use:
_10StringStream.from(fs.createReadStream(filePath)).map(asyncMapper);_10// and some more operations.
This way we can make sure we can use the scramjet
goodness on any other stream.
Another repeatable use case of scramjet
is when there's a need to asynchronously iterate over an Array - a quite common starting point of a project:
_14const countries = [_14 "it.json",_14 "pl.json",_14 "uk.json",_14 "fr.json",_14 "de.json",_14 "es.json"_14];_14_14await (_14 StringStream.from(countries)_14 .parse(name => await loadFile(name))_14 // and we get a stream of all contents_14);
It also works similarly on Iterators and iterables.
Oh, I did fall in love with those Async Generators and immediately thought - this is the best implementation of creating the chunks in some of my cases. For example reading some 4-bit digital values from GPIO to a stream, but not more often than every second:
_33const gpio = require("rpi-gpio-promise");_33const sleep = require('sleep-promise');_33_33DataStream_33 .from(async function(init)* {_33 await Promise.all([_33 gpio.setup(init, gpio.DIR_IN),_33 gpio.setup(init + 1, gpio.DIR_IN),_33 gpio.setup(init + 2, gpio.DIR_IN),_33 gpio.setup(init + 3, gpio.DIR_IN)_33 ]);_33_33 while (true) {_33 yield await Promise.all([_33 gpio.read(init)_33 gpio.read(init + 1)_33 gpio.read(init + 2)_33 gpio.read(init + 3)_33 ]);_33 await sleep(1000);_33 }_33 }, 6) // anything passed here will be passed to generator as initial arguments._33 .map(([a,b,c,d]) => ({value: a*8 + b*4 + c*2 + d}))_33 .each(console.log)_33 .run()_33;_33_33// and we get:_33// 1_33// 3_33// 7_33// 4_33// and so on.
Sometimes, I found, that stream generators may depend on some synchronous or asynchronous logic, but I'd like the program to synchronously return a stream. I mean after all fs.createReadStream
returns the stream before knowing if the file actually exists. And let's face it: we do know what we want be returned - a DataStream
for instance.
This lead to introduction of passing functions
to from
:
_10async function getData(base) {_10 const length = await requestPromise.get(`${base}/count`);_10_10 if (length === 0) return [";no-entries"];_10 else return request.get(`${base}/entries.csv`);_10}_10_10DataStream.from(getData, "https://myapi.example.com") // arguments passed here will be passed to the function_10 .pipe(fs.createWriteStream("log.txt"));
So we can already pipe the data to the log even though we don't know where we'll get the stream from in the end. Anything that the function returns will get passed to another from
and the output from that will be piped to the stream that was returned in the beginning.
You can even return another function - like a redirect it will try to resolve it. Just keep in mind you can get to an infinite loop here.
Similarly to the above functions, passing a string to from
will attempt to load a module. Let's consider these two files:
_10/** @file lib/gen.js */_10_10module.exports = (apiBase, key, secret) => {_10 if (!valid(key, secret)) {_10 throw new Error("Invalid key or secret");_10 }_10_10 return streamData(apiBase, key, secret);_10};
_10/** @file index.js */_10DataStream.from("lib/gen", process.env.API_BASE, process.env.API_KEY, process.env.API_SECRET).pipe(_10 fs.createWriteStream(process.argv[2])_10);
In the second file we simply reference the relative location of the module (as we pass it to require
). Scramjet will load the module and use it's module.exports as an argument for another from
call, like in the case of a function.
I hope this was a good read. If you think there something to correct or add, please feel free to raise a new issue on GitHub.