Blog
Generators
Node.js v8 and v10 brought two new ways of handling and generating data: function*
and async function*
respectively. Scramjet brings generator and iterator protocol support to node.js streams and makes them easy to use as callbacks to standard methods.
Streams are great for data processing, but it must be said they're far from being easy to use. Modules like scramjet
or highland.js
aim to make stream transforms easier to use but there's always a matter of getting the stream in the first place.
In scramjet
since version 4.20 you could use [async] generators in DataStream.from
and DataStream.use
and in 4.24 I added the possibility in pull
, consume
and flatMap
also. This may feel not that important, but actually it does make a major difference in how you can use scramjet
and how the data is being consumed.
Let's see how this works in every method - I'm going to use function*
and async function*
where it makes sense for the example, but you can safely assume that asynchronous and synchronous generators are supported everywhere.
Generating a stream with "from" method
Here's a simple generator that creates a stream sequential of numbers:
_21DataStream.from(_21 function* (start = 0) {_21 let i = 0;_21 while (i < 100) {_21 // this is where we generate the data_21 yield i++ + start;_21 }_21 },_21 null, // options go here, but can be omitted_21 10 // all extra arguments will be passed to a generator_21)_21 .stringify(x => `file.part.${x}.zip`)_21 .consume(console.log)_21 .then(() => console.log("done!"));_21// -> file.part.10.zip_21// -> file.part.11.zip_21// -> file.part.12.zip_21// -> file.part.13.zip_21// ..._21// -> file.part.109.zip_21// -> done!
This will be useful for scenarios where lots of consecutive items have to be dealt with. But what if we'd actually want to check something, like the existence of a file on filesystem? Here's where async generators come into play. An async generator just an async function
in which you can yield
your data. Better yet, scramjet
takes care of handling the generator so you get a ready stream like this:
_30const readFile = util.promisify(fs.readFile);_30_30StringStream.from(_30 async function* (start = 0, end = 100) {_30 let i = 0;_30 while (i < end) {_30 // this is where we generate the data_30 const filename = `data.${i++ + start}.json`;_30 try {_30 // we simply yield the whole file here._30 yield readFile(filename, { encoding: "utf-8" });_30 } catch (e) {_30 // if the reading fails simply return (let's assume that the file doesn't exist)_30 return;_30 }_30 }_30 },_30 null, // options go here, but can be omitted_30 10, // all extra arguments will be passed to a generator_30 39_30)_30 .JSONParse() // parses all entries as JSON_30 .consume(console.log)_30 .then(() => console.log("done!"));_30_30// -> {contents: "of", file: [10]}}_30// -> {contents: "of", file: [11]}}_30// -> ..._30// -> {contents: "of", file: [37]}}_30// -> done!
As you see, generators give us a good ways of creating streams. The last example however is not as efficient as it should. The generator cannot be run in parallel - you should think of it as a state machine that runs from yield
to yield
until return
. But there's nothing keeping us from executing a couple generators in parallel, which we can do further down the stream.
Generating entries in "pull" and "flatMap"
In order to make the generators run in parallel we can simply use the first generator to generate the stream entries synchronously then run the asynchronous operations in another generator like this:
_32// we'll be doing some reading so let's prepare a method here._32const readJson = async file => JSON.parse(await readFile(file, { encoding: "utf-8" }));_32_32DataStream.from(function* () {_32 yield 1;_32 yield 2;_32 yield 3;_32})_32 .flatMap(async function* (num) {_32 // let's assume we need to fetch some index file_32 const data = await readJson(`./dir-${num}/index.json`);_32_32 // now that we got the index we can run the loop:_32 for (let file of data) {_32 const entries = await readJson(`./dir-${num}/${file}`);_32_32 // here we output all entries for a file_32 if (Array.isArray(entries)) yield* entries;_32 }_32 })_32 .consume(console.log)_32 .then(() => console.log("done!"));_32_32// -> {data: "from", dir: 1, file: 1, entry: 1}}_32// -> {data: "from", dir: 2, file: 1, entry: 2}}_32// -> {data: "from", dir: 1, file: 2, entry: 1}}_32// -> {data: "from", dir: 1, file: 2, entry: 2}}_32// -> {data: "from", dir: 2, file: 2, entry: 1}}_32// -> {data: "from", dir: 1, file: 3, entry: 1}}_32// -> {data: "from", dir: 2, file: 3, entry: 1}}_32// -> ..._32// -> done!
In this example we're reading whole directories, but instead going one by one, we're outputting entries from files in order as they're read. See also that I used yield*
. It's a handy feature that outputs anything iterable (including async iterables) flattening the output. Writing yield* iterable
has the same effect to for await (let c of iterable) yield c
.
Another option that scramjet
gives you is using an iterator in pull
like this:
_14const stream = new DataStream();_14_14(async () => {_14 const data = await fetchJson("https://example.org/index.json");_14 for (let item of data) {_14 await stream.pull(async function* () {_14 for (let element of item) yield element.ref ? await fetchJson(element.ref) : element;_14 });_14 }_14})()_14 .catch(e => stream.raise(e))_14 .then(() => stream.end());_14_14return stream;
As shown the generators simplify the code when dealing with multiple asynchronous operations done at different levels. For instance here's an alternative version of the last example without generators:
_19const stream = new DataStream();_19_19(async () => {_19 const data = await fetchJson("https://example.org/index.json");_19 for (let item of data) {_19 await stream.whenWrote(item);_19 }_19})()_19 .catch(e => stream.raise(e))_19 .then(() => stream.end());_19_19return stream.flatMap(async x => {_19 const ret = [];_19 for (let i of x) {_19 if (element.ref) ret.push(await fetchJson(element.ref));_19 else ret.push(element);_19 }_19 return ret;_19});
Not only the first version is more readable, but also chunks are pushed one after another and there's no need for transforming the stream with a flatMap
.
If there's an interesting example you'd like to add, or write an article yourself - fork the signicode/scramjet-articles repo and create a pull request.
Project co-financed by the European Union from the European Regional Development Fund under the Knowledge Education Development Program. The project is carried out as a part of the competition of the National for Research and Development: Szybka Ścieżka.