Scramjet v4.21
I'm thrilled to announce the release of Scramjet v4.21 with two important additions:
DataStream..exec
- allows pushing a stream through stdin/out of a spawned child processesDataStream..rate
- allows to limit stream rate to fit requirements
Child process
Interoperability is one of the most important factors of a well designed framework and it's hard not to agree that scramjet
was node.js oriented. At all times there was a need to stream logs or other data into a scramjet pipeline or stream it out it was pretty easy to use node's process.stdio
and simply use shell pipes.
While working on a low-latency HLS server I found that there are quite some cases where a process should be started and included in the pipeline as if it was simply another function in the flow. This has led to creating a new method exec
. The idea behind it is to: spawn
a new process, pipe the input
to stdin of the child read output
out of stdout, and make all of this as automated as possible.
Here's the result - let's consider an awk script from this StackOverflow question on moving average:
#!/usr/bin/env awk
BEGIN {
m=int((n+1)/2)
}
{L[NR]=$2; sum+=$2}
NR>=m {d[++i]=$1}
NR>n {sum-=L[NR-n]}
NR>=n{
a[++k]=sum/n
}
END {
for (j=1; j<=k; j++)
print d[j],a[j]
}
There's a better way of doing this with DataStream.window
, but it's a script we want to execute in our flow. Here comes the StringStream..exec
method:
DataStream.from(depthAndVelocityGenerator())
.stringify(({depth, velocity}) => `${depth.toFixed(3)}\t${velocity.toFixed(3)}\n`)
// produces lines of "0.000\t0.000\n", so we put that through out awk script
.exec('./path/to/window.awk')
// and bingo! here's the output of that command, based on the input we send, let's parse it
.parse(line => line.split(\t))
.map([entries, average] => )
.each(console.log)
.run()
Now we can do the same without casting to StringStream by using a facilitation function on DataStream.
DataStream.from(depthAndVelocityGenerator())
// produces lines of "0.000\t0.000\n", so we put that through out awk script
.exec("./path/to/window.awk", options)
// and bingo! here's the output of that command, based on the input we send
.each(console.log)
.run();
Where our options are:
parse
scramjet module to transform the stream to string or buffer streamstringify
scramjet module to transform from string or buffer stream to wanted version
These by default will perform a "jsonnd" or \n
delimited JSON serialization, but you can use whatever you want here.
With exec you can start another process written in any language and doing anything you like outside of node.js environment. This release is a start of a long process of implementing ability of interoperating with different languages and environments.
There's one important missing factor in 4.21.0 which will be released not later than 4.22.0 - that's BufferStream..exec
. The tests did show we need to put some extra work on this, but pushing native buffers to stdin could add new usage for real-time binary data processing (like video, images etc.). We're looking into this and will keep everyone posted.
Here's some info on method signatures and formal documentation:
Rate limiting
Another addition is much simpler, but may come in quite handy. One of our users contacted me about a need to limit the number of requests he made within each second not to exceed API limits. His case was pretty much like this:
StringStream.from(fs.createReadStream("users-data.csv"))
.CSVParse()
.assign(async ({ email }) => {
try {
const { userId, banStatus } = await fetch(
"https://partner.example.org/api/v1/",
{
method: "POST",
body: JSON.stringify({ email }),
}
);
return { newUser: false, userId, banned: banStatus < 1 };
} catch (e) {
return { newUser: true };
}
})
.use("./update-user-db")
.run();
They found however no means of limiting the flow rate and kept hitting the API limits. We initially helped them by creating a rate-limiting module, but then decided it should be a feature of scramjet after all. So with the release of scramjet v4.21 there's a simple option to do that by adding a simple line .rate(10)
, here's that in the code:
StringStream.from(fs.createReadStream("users-data.csv"))
.CSVParse()
.rate(10) // objects per second
.assign(async ({ email }) => {
try {
const { userId, banStatus } = await fetch(
"https://partner.example.org/api/v1/",
{
method: "POST",
body: JSON.stringify({ email }),
}
);
return { newUser: false, userId, banned: banStatus < 1 };
} catch (e) {
return { newUser: true };
}
})
.use("./update-user-db")
.run();
The stream flow will be choked to keep flowing at the specified rate.
Here's the exact usage of the new method: DataStream..rate