Most time spent on debugging streams is answering the question: why doesn't it flow?
What we need to know first is if the stream has a reason to flow. To understand that it's worth to consider reactive stream programming as a metaphor in plumbing (another good alternative is electric circuitry, but I tend to prefer water over electricity).
A stream based program is like pipework.
Every call in a program can be envisioned as pipe joint, so if we consider a simple program:
_10DataStream.from(process.stdin)_10 .lines()_10 .parse(asyncParser)_10 .filter(x => x && x.level <= 2)_10 .pipe(stdout);
Our plumbing looks like this:
_10stdin --> StringStream.from --> lines --> parse --> filter --> stdout
The stream above has an input, some transforms and an output just as piping would. Now the stream will flow as fast as the slowest element in the above piping. I won't try to tell you you could apply Bernoulli's principles here but I do suggest just to consider this and you'll be amazed to see the similarities.
In the above program we can assume that the asyncParser
method would be our bottleneck and the limiting factor.
Now let's change the program to use one point in the program to push data to two separate transforms like this:
_10const step = DataStream.from(process.stdin).lines().parse(asyncParser);_10_10step.filter(x => x && x.level <= 2).pipe(stdout);_10_10const count = await step.reduce(([count, lines], ln) => [ln.level > 2 ? count : count + 1, lines + 1][(0, 0)]);_10console.log(`Processed ${count[0]} of ${count[1]} entries.`);
Out plumbing looks like this:
_10 /--> filter --> stdout_10stdin --> StringStream.from --> lines --> parse <_10 \--> reduce --> await
Now the stream will flow at the rate of the slower of our outputs.
The flow may be slow, but it can be equal to zero - which means that your program will get stuck and stop operating. There are three possible reasons for this:
Part of debugging of the flow is to find the flow rate of our stream at every point in the flow. Each stream will attempt to consume some items from the previous operations.
Although scramjet
does not provide any specific methods for this, the flow rate can be read pretty easily by putting this simple use
command:
_18 StringStream.from(request.get('https://example.org/data.json'))_18 .parse(asyncParser)_18 .do(asyncOperation)_18 // --- here begins ---_18 .use(stream => {_18 let cnt = 0;_18 stream.do(() => cnt++);_18_18 setInterval(() => {_18 console.log(`Flow rate ${cnt} entries per second...`);_18 cnt = 0;_18 }), 1000 /* you may want to adapt this to your needs */);_18_18 return stream; // always rememeber to return the stream!_18 })_18 // --- here ends ---_18 .filter(asyncFilter)_18 .reduce(asyncReducer)
Depending on what flow rate you'd expect you should adapt the time interval marked above. For example, when parsing a log file, I'd leave it at 1000 milliseconds, but reading a stream of comments this may be better at 60000 milliseconds.
Results:
_10Flow rate 122 entries per second..._10Flow rate 169 entries per second..._10Flow rate 118 entries per second..._10Flow rate 201 entries per second...
The flow may be unequal, but it keeps flowing with every second.
use
command work, the output will be something like this:
_10Flow rate 63 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second...
The use has seen some initial elements, but after that nothing happened.
use
command is not working, the output will look like this:
_10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second...
No chunks reach the position at all, which means that
Each scramjet
stream has a method called graph
which returns a list of streams that are connected to the current instance. This can be read only one way now, but in future versions this will be more useful.
A simple code helps finding where our stream originates in our code:
_10function printStreamGraph(str) {_10 str.graph(instances => {_10 console.log(str.name);_10 instances.forEach(x => console.log(x.constructed.split("\n")[2]));_10 });_10}
Calling this on a stream will result in something like this:
_10 DataStream(18)_10 at DataStream.map (index.js:581:15)_10 at DataStream.filter (lib/loader.js:507:25)_10 at DataStream.flatMap (lib/loader.js:637:17)_10 at StringStream.parse (lib/source.js:805:12)_10 at StringStream.from (index.js:588:7)
Now it's just a matter of finding your suspect.
I hope this was a good read. If you think there something to correct or add, please feel free to raise a new issue on GitHub.