A small fix introduced today makes your life easier if you happened to use one of Scramjet's multithreaded functions, namely:
Underneath the methods start workers ready to execute any stream transform you like and wait for a control message that is used to recreate the transform in the worker. After the control message is received the worker waits until the transformed stream ends and if there's no other stream processed on the same worker it shuts down cleanly.
The scramjet side on the other hand pushes out MultiStream
members to separate workers or in a single DataStream..distribute
an affinity function is used to choose which workers should receive which chunks. These two ways of distributing work as follows:
MultiStream way
_10 const streams = await getArrayOfStreams();_10_10 new MultiStream(stream)_10 .smap(stream => stream.use('./checks-and-filters'))_10 // the line below executes in workers_10 .cluster(stream => return stream.use('./cpu-heavy-ops'))_10 .mux()_10 // here we're back in the main thread
DataStream way
_10 DataStream.from(streamGenerator)_10 .use('./checks-and-filters')_10 .distribute(_10 // this method makes sure that chunks make their way to the right workers_10 chunk => chunk.id % 8,_10 // the line below executes in workers_10 stream => return stream.use('./cpu-heavy-ops')_10 )_10 .mux()_10 // here we're back in the main thread with one stream
So what went wrong? Well... in short, it appears that all our tested cases covered two situations:
We found however that the workers that are not utilized wait forever and hold the main thread from exiting even though all the other refcounts are zeroed. If we check the above example - if you run it on a small VPS with 4 CPU threads available it'll exit correctly - but run it on a bigger machine and it will do all it's work, but never exit. Here's why:
_201. `distribute()` -> spawn 2 x available threads of workers: 0, 1, 2, ... 7, ... 31._202. `resume()` -> we start flowing the elements, the affinity functions returns ids:_20 * {id: 0} -> worker 0_20 * {id: 1} -> worker 1_20 * {id: 2} -> worker 2_20 * {id: 3} -> worker 3_20 * {id: 4} -> worker 4_20 * {id: 5} -> worker 5_20 * {id: 6} -> worker 6_20 * {id: 7} -> worker 7_20 * {id: 8} -> worker 0_20 * {id: 9} -> worker 1_20 ..._203. Actual distribution happens..._20 * Workers 0 through 7 have received a code to execute and were unreffed._20 * Workers 8 through 31 are still awaiting code to run._204. `end()` -> the stream ends_20 * Workers 0 through 7 exit silently_20 * Workers 8 through 32 are still awaiting code to run._205. The program never ends.
In the patch we assumed that a worker that's not utilized for a second will also be allowed to exit. Now the mentioned workers 8 through 32 will be spawned for a second, but then stopped after execution. You may actually need that second of free time to be lower - in such case remember that both cluster
and distribute
can be called with options
as the last parameter where you can provide the number of threads you wish to start with, for example:
_10 DataStream.from(streamGenerator)_10 .use('./checks-and-filters')_10 .distribute(_10 chunk => chunk.id % 8,_10 stream => return stream.use('./cpu-heavy-ops'),_10 // only 8 workers will start_10 {threads: 8}_10 )_10 .mux()