Blog
Oh my! The stream does not flow!
Most time spent on debugging streams is answering the question: why doesn't it flow?
Streaming speed
What we need to know first is if the stream has a reason to flow. To understand that it's worth to consider reactive stream programming as a metaphor in plumbing (another good alternative is electric circuitry, but I tend to prefer water over electricity).
A stream based program is like pipework.
Every call in a program can be envisioned as pipe joint, so if we consider a simple program:
_10DataStream.from(process.stdin)_10 .lines()_10 .parse(asyncParser)_10 .filter(x => x && x.level <= 2)_10 .pipe(stdout);
Our plumbing looks like this:
_10stdin --> StringStream.from --> lines --> parse --> filter --> stdout
The stream above has an input, some transforms and an output just as piping would. Now the stream will flow as fast as the slowest element in the above piping. I won't try to tell you you could apply Bernoulli's principles here but I do suggest just to consider this and you'll be amazed to see the similarities.
In the above program we can assume that the asyncParser
method would be our bottleneck and the limiting factor.
Now let's change the program to use one point in the program to push data to two separate transforms like this:
_10const step = DataStream.from(process.stdin).lines().parse(asyncParser);_10_10step.filter(x => x && x.level <= 2).pipe(stdout);_10_10const count = await step.reduce(([count, lines], ln) => [ln.level > 2 ? count : count + 1, lines + 1][(0, 0)]);_10console.log(`Processed ${count[0]} of ${count[1]} entries.`);
Out plumbing looks like this:
_10 /--> filter --> stdout_10stdin --> StringStream.from --> lines --> parse <_10 \--> reduce --> await
Now the stream will flow at the rate of the slower of our outputs.
The flow may be slow, but it can be equal to zero - which means that your program will get stuck and stop operating. There are three possible reasons for this:
- The source does not flow - this is when the source of the data stops pushing new data into the stream - for example: an HTTP response from the server is not being sent due to network congestion.
- The output does not accept more data - for example the database pool you're using to stream the output is exhausted and we're waiting for a free connection.
- One of the transforms is not returning data - for example a promise is returned but never resolved (due to an error).
Reading flow rate
Part of debugging of the flow is to find the flow rate of our stream at every point in the flow. Each stream will attempt to consume some items from the previous operations.
Although scramjet
does not provide any specific methods for this, the flow rate can be read pretty easily by putting this simple use
command:
_18 StringStream.from(request.get('https://example.org/data.json'))_18 .parse(asyncParser)_18 .do(asyncOperation)_18 // --- here begins ---_18 .use(stream => {_18 let cnt = 0;_18 stream.do(() => cnt++);_18_18 setInterval(() => {_18 console.log(`Flow rate ${cnt} entries per second...`);_18 cnt = 0;_18 }), 1000 /* you may want to adapt this to your needs */);_18_18 return stream; // always rememeber to return the stream!_18 })_18 // --- here ends ---_18 .filter(asyncFilter)_18 .reduce(asyncReducer)
Depending on what flow rate you'd expect you should adapt the time interval marked above. For example, when parsing a log file, I'd leave it at 1000 milliseconds, but reading a stream of comments this may be better at 60000 milliseconds.
Results:
- In a good situation - when the source pushes, transforms return and output pulls chunks - you would expect something like this:
_10Flow rate 122 entries per second..._10Flow rate 169 entries per second..._10Flow rate 118 entries per second..._10Flow rate 201 entries per second...
The flow may be unequal, but it keeps flowing with every second.
- If the source pushes and all transforms before our
use
command work, the output will be something like this:
_10Flow rate 63 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second...
The use has seen some initial elements, but after that nothing happened.
- Lastly if the source or one of the transforms before our
use
command is not working, the output will look like this:
_10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second..._10Flow rate 0 entries per second...
No chunks reach the position at all, which means that
Reading the stream graph
Each scramjet
stream has a method called graph
which returns a list of streams that are connected to the current instance. This can be read only one way now, but in future versions this will be more useful.
A simple code helps finding where our stream originates in our code:
_10function printStreamGraph(str) {_10 str.graph(instances => {_10 console.log(str.name);_10 instances.forEach(x => console.log(x.constructed.split("\n")[2]));_10 });_10}
Calling this on a stream will result in something like this:
_10 DataStream(18)_10 at DataStream.map (index.js:581:15)_10 at DataStream.filter (lib/loader.js:507:25)_10 at DataStream.flatMap (lib/loader.js:637:17)_10 at StringStream.parse (lib/source.js:805:12)_10 at StringStream.from (index.js:588:7)
Now it's just a matter of finding your suspect.
Summary
I hope this was a good read. If you think there something to correct or add, please feel free to raise a new issue on GitHub.
Project co-financed by the European Union from the European Regional Development Fund under the Knowledge Education Development Program. The project is carried out as a part of the competition of the National for Research and Development: Szybka Ścieżka.