608B71FC-006A-4934-A643-7D9BA9340450Blog

Generators

blog__author-img
Michał Czapracki
CEO at Scramjet, Data Streaming Expert.
23F2E8CD-3026-46A5-86CC-D13114F7176E425AE875-B1A1-4EA1-8529-075D08DA0BB1

7 May 2019

Node.js v8 and v10 brought two new ways of handling and generating data: function* and async function* respectively. Scramjet brings generator and iterator protocol support to node.js streams and makes them easy to use as callbacks to standard methods.

Streams are great for data processing, but it must be said they're far from being easy to use. Modules like scramjet or highland.js aim to make stream transforms easier to use but there's always a matter of getting the stream in the first place.

In scramjet since version 4.20 you could use [async] generators in DataStream.from and DataStream.use and in 4.24 I added the possibility in pull, consume and flatMap also. This may feel not that important, but actually it does make a major difference in how you can use scramjet and how the data is being consumed.

Let's see how this works in every method - I'm going to use function* and async function* where it makes sense for the example, but you can safely assume that asynchronous and synchronous generators are supported everywhere.

Generating a stream with "from" method

Here's a simple generator that creates a stream sequential of numbers:


_21
DataStream.from(
_21
function* (start = 0) {
_21
let i = 0;
_21
while (i < 100) {
_21
// this is where we generate the data
_21
yield i++ + start;
_21
}
_21
},
_21
null, // options go here, but can be omitted
_21
10 // all extra arguments will be passed to a generator
_21
)
_21
.stringify(x => `file.part.${x}.zip`)
_21
.consume(console.log)
_21
.then(() => console.log("done!"));
_21
// -> file.part.10.zip
_21
// -> file.part.11.zip
_21
// -> file.part.12.zip
_21
// -> file.part.13.zip
_21
// ...
_21
// -> file.part.109.zip
_21
// -> done!

This will be useful for scenarios where lots of consecutive items have to be dealt with. But what if we'd actually want to check something, like the existence of a file on filesystem? Here's where async generators come into play. An async generator just an async function in which you can yield your data. Better yet, scramjet takes care of handling the generator so you get a ready stream like this:


_30
const readFile = util.promisify(fs.readFile);
_30
_30
StringStream.from(
_30
async function* (start = 0, end = 100) {
_30
let i = 0;
_30
while (i < end) {
_30
// this is where we generate the data
_30
const filename = `data.${i++ + start}.json`;
_30
try {
_30
// we simply yield the whole file here.
_30
yield readFile(filename, { encoding: "utf-8" });
_30
} catch (e) {
_30
// if the reading fails simply return (let's assume that the file doesn't exist)
_30
return;
_30
}
_30
}
_30
},
_30
null, // options go here, but can be omitted
_30
10, // all extra arguments will be passed to a generator
_30
39
_30
)
_30
.JSONParse() // parses all entries as JSON
_30
.consume(console.log)
_30
.then(() => console.log("done!"));
_30
_30
// -> {contents: "of", file: [10]}}
_30
// -> {contents: "of", file: [11]}}
_30
// -> ...
_30
// -> {contents: "of", file: [37]}}
_30
// -> done!

As you see, generators give us a good ways of creating streams. The last example however is not as efficient as it should. The generator cannot be run in parallel - you should think of it as a state machine that runs from yield to yield until return. But there's nothing keeping us from executing a couple generators in parallel, which we can do further down the stream.

Generating entries in "pull" and "flatMap"

In order to make the generators run in parallel we can simply use the first generator to generate the stream entries synchronously then run the asynchronous operations in another generator like this:


_32
// we'll be doing some reading so let's prepare a method here.
_32
const readJson = async file => JSON.parse(await readFile(file, { encoding: "utf-8" }));
_32
_32
DataStream.from(function* () {
_32
yield 1;
_32
yield 2;
_32
yield 3;
_32
})
_32
.flatMap(async function* (num) {
_32
// let's assume we need to fetch some index file
_32
const data = await readJson(`./dir-${num}/index.json`);
_32
_32
// now that we got the index we can run the loop:
_32
for (let file of data) {
_32
const entries = await readJson(`./dir-${num}/${file}`);
_32
_32
// here we output all entries for a file
_32
if (Array.isArray(entries)) yield* entries;
_32
}
_32
})
_32
.consume(console.log)
_32
.then(() => console.log("done!"));
_32
_32
// -> {data: "from", dir: 1, file: 1, entry: 1}}
_32
// -> {data: "from", dir: 2, file: 1, entry: 2}}
_32
// -> {data: "from", dir: 1, file: 2, entry: 1}}
_32
// -> {data: "from", dir: 1, file: 2, entry: 2}}
_32
// -> {data: "from", dir: 2, file: 2, entry: 1}}
_32
// -> {data: "from", dir: 1, file: 3, entry: 1}}
_32
// -> {data: "from", dir: 2, file: 3, entry: 1}}
_32
// -> ...
_32
// -> done!

In this example we're reading whole directories, but instead going one by one, we're outputting entries from files in order as they're read. See also that I used yield*. It's a handy feature that outputs anything iterable (including async iterables) flattening the output. Writing yield* iterable has the same effect to for await (let c of iterable) yield c.

Another option that scramjet gives you is using an iterator in pull like this:


_14
const stream = new DataStream();
_14
_14
(async () => {
_14
const data = await fetchJson("https://example.org/index.json");
_14
for (let item of data) {
_14
await stream.pull(async function* () {
_14
for (let element of item) yield element.ref ? await fetchJson(element.ref) : element;
_14
});
_14
}
_14
})()
_14
.catch(e => stream.raise(e))
_14
.then(() => stream.end());
_14
_14
return stream;

As shown the generators simplify the code when dealing with multiple asynchronous operations done at different levels. For instance here's an alternative version of the last example without generators:


_19
const stream = new DataStream();
_19
_19
(async () => {
_19
const data = await fetchJson("https://example.org/index.json");
_19
for (let item of data) {
_19
await stream.whenWrote(item);
_19
}
_19
})()
_19
.catch(e => stream.raise(e))
_19
.then(() => stream.end());
_19
_19
return stream.flatMap(async x => {
_19
const ret = [];
_19
for (let i of x) {
_19
if (element.ref) ret.push(await fetchJson(element.ref));
_19
else ret.push(element);
_19
}
_19
return ret;
_19
});

Not only the first version is more readable, but also chunks are pushed one after another and there's no need for transforming the stream with a flatMap.

If there's an interesting example you'd like to add, or write an article yourself - fork the signicode/scramjet-articles repo and create a pull request.

Project co-financed by the European Union from the European Regional Development Fund under the Knowledge Education Development Program. The project is carried out as a part of the competition of the National for Research and Development: Szybka Ścieżka.