Most time spent on debugging streams is answering the question: why doesn't it flow?

Streaming speed

What we need to know first is if the stream has a reason to flow. To understand that it's worth to consider reactive stream programming as a metaphor in plumbing (another good alternative is electric circuitry, but I tend to prefer water over electricity).

A stream based program is like pipework.

Every call in a program can be envisioned as pipe joint, so if we consider a simple program:


_10
DataStream.from(process.stdin)
_10
.lines()
_10
.parse(asyncParser)
_10
.filter(x => x && x.level <= 2)
_10
.pipe(stdout);

Our plumbing looks like this:


_10
stdin --> StringStream.from --> lines --> parse --> filter --> stdout

The stream above has an input, some transforms and an output just as piping would. Now the stream will flow as fast as the slowest element in the above piping. I won't try to tell you you could apply Bernoulli's principles here but I do suggest just to consider this and you'll be amazed to see the similarities.

In the above program we can assume that the asyncParser method would be our bottleneck and the limiting factor.

Now let's change the program to use one point in the program to push data to two separate transforms like this:


_10
const step = DataStream.from(process.stdin).lines().parse(asyncParser);
_10
_10
step.filter(x => x && x.level <= 2).pipe(stdout);
_10
_10
const count = await step.reduce(([count, lines], ln) => [ln.level > 2 ? count : count + 1, lines + 1][(0, 0)]);
_10
console.log(`Processed ${count[0]} of ${count[1]} entries.`);

Out plumbing looks like this:


_10
/--> filter --> stdout
_10
stdin --> StringStream.from --> lines --> parse <
_10
\--> reduce --> await

Now the stream will flow at the rate of the slower of our outputs.

The flow may be slow, but it can be equal to zero - which means that your program will get stuck and stop operating. There are three possible reasons for this:

Reading flow rate

Part of debugging of the flow is to find the flow rate of our stream at every point in the flow. Each stream will attempt to consume some items from the previous operations.

Although scramjet does not provide any specific methods for this, the flow rate can be read pretty easily by putting this simple use command:


_18
StringStream.from(request.get('https://example.org/data.json'))
_18
.parse(asyncParser)
_18
.do(asyncOperation)
_18
// --- here begins ---
_18
.use(stream => {
_18
let cnt = 0;
_18
stream.do(() => cnt++);
_18
_18
setInterval(() => {
_18
console.log(`Flow rate ${cnt} entries per second...`);
_18
cnt = 0;
_18
}), 1000 /* you may want to adapt this to your needs */);
_18
_18
return stream; // always rememeber to return the stream!
_18
})
_18
// --- here ends ---
_18
.filter(asyncFilter)
_18
.reduce(asyncReducer)

Depending on what flow rate you'd expect you should adapt the time interval marked above. For example, when parsing a log file, I'd leave it at 1000 milliseconds, but reading a stream of comments this may be better at 60000 milliseconds.

Results:


_10
Flow rate 122 entries per second...
_10
Flow rate 169 entries per second...
_10
Flow rate 118 entries per second...
_10
Flow rate 201 entries per second...

The flow may be unequal, but it keeps flowing with every second.


_10
Flow rate 63 entries per second...
_10
Flow rate 0 entries per second...
_10
Flow rate 0 entries per second...
_10
Flow rate 0 entries per second...

The use has seen some initial elements, but after that nothing happened.


_10
Flow rate 0 entries per second...
_10
Flow rate 0 entries per second...
_10
Flow rate 0 entries per second...
_10
Flow rate 0 entries per second...

No chunks reach the position at all, which means that

Reading the stream graph

Each scramjet stream has a method called graph which returns a list of streams that are connected to the current instance. This can be read only one way now, but in future versions this will be more useful.

A simple code helps finding where our stream originates in our code:


_10
function printStreamGraph(str) {
_10
str.graph(instances => {
_10
console.log(str.name);
_10
instances.forEach(x => console.log(x.constructed.split("\n")[2]));
_10
});
_10
}

Calling this on a stream will result in something like this:


_10
DataStream(18)
_10
at DataStream.map (index.js:581:15)
_10
at DataStream.filter (lib/loader.js:507:25)
_10
at DataStream.flatMap (lib/loader.js:637:17)
_10
at StringStream.parse (lib/source.js:805:12)
_10
at StringStream.from (index.js:588:7)

Now it's just a matter of finding your suspect.

Summary

I hope this was a good read. If you think there something to correct or add, please feel free to raise a new issue on GitHub.