A small fix introduced today makes your life easier if you happened to use one of Scramjet's multithreaded functions, namely:

Underneath the methods start workers ready to execute any stream transform you like and wait for a control message that is used to recreate the transform in the worker. After the control message is received the worker waits until the transformed stream ends and if there's no other stream processed on the same worker it shuts down cleanly.

The scramjet side on the other hand pushes out MultiStream members to separate workers or in a single DataStream..distribute an affinity function is used to choose which workers should receive which chunks. These two ways of distributing work as follows:

MultiStream way


_10
const streams = await getArrayOfStreams();
_10
_10
new MultiStream(stream)
_10
.smap(stream => stream.use('./checks-and-filters'))
_10
// the line below executes in workers
_10
.cluster(stream => return stream.use('./cpu-heavy-ops'))
_10
.mux()
_10
// here we're back in the main thread

DataStream way


_10
DataStream.from(streamGenerator)
_10
.use('./checks-and-filters')
_10
.distribute(
_10
// this method makes sure that chunks make their way to the right workers
_10
chunk => chunk.id % 8,
_10
// the line below executes in workers
_10
stream => return stream.use('./cpu-heavy-ops')
_10
)
_10
.mux()
_10
// here we're back in the main thread with one stream

So what went wrong? Well... in short, it appears that all our tested cases covered two situations:

We found however that the workers that are not utilized wait forever and hold the main thread from exiting even though all the other refcounts are zeroed. If we check the above example - if you run it on a small VPS with 4 CPU threads available it'll exit correctly - but run it on a bigger machine and it will do all it's work, but never exit. Here's why:


_20
1. `distribute()` -> spawn 2 x available threads of workers: 0, 1, 2, ... 7, ... 31.
_20
2. `resume()` -> we start flowing the elements, the affinity functions returns ids:
_20
* {id: 0} -> worker 0
_20
* {id: 1} -> worker 1
_20
* {id: 2} -> worker 2
_20
* {id: 3} -> worker 3
_20
* {id: 4} -> worker 4
_20
* {id: 5} -> worker 5
_20
* {id: 6} -> worker 6
_20
* {id: 7} -> worker 7
_20
* {id: 8} -> worker 0
_20
* {id: 9} -> worker 1
_20
...
_20
3. Actual distribution happens...
_20
* Workers 0 through 7 have received a code to execute and were unreffed.
_20
* Workers 8 through 31 are still awaiting code to run.
_20
4. `end()` -> the stream ends
_20
* Workers 0 through 7 exit silently
_20
* Workers 8 through 32 are still awaiting code to run.
_20
5. The program never ends.

In the patch we assumed that a worker that's not utilized for a second will also be allowed to exit. Now the mentioned workers 8 through 32 will be spawned for a second, but then stopped after execution. You may actually need that second of free time to be lower - in such case remember that both cluster and distribute can be called with options as the last parameter where you can provide the number of threads you wish to start with, for example:


_10
DataStream.from(streamGenerator)
_10
.use('./checks-and-filters')
_10
.distribute(
_10
chunk => chunk.id % 8,
_10
stream => return stream.use('./cpu-heavy-ops'),
_10
// only 8 workers will start
_10
{threads: 8}
_10
)
_10
.mux()