Node.js child_process, cluster, worker_thredsを確認した

Node.jsにおけるいわゆる並行処理に関して再確認。
Node.jsは一般的にシングルスレッド、非同期IOとして認識されている。
これは言語としてマルチスレッドによる複雑性を排除した処理を選択した経緯があるらしい。

v11.7.0worker_threadsが取り込まれたことで、スレッド処理が可能になった。

https://github.com/nodejs/node/pull/25361

ちなみにv11.7.0 以前では worker_threads の利用の際には--experimental-workerオプション付きで利用できていた。

並行処理

EventLoop

Event Loop Explained

IOバウンドな処理を考慮した場合、処理の実行タイミングを変更することで、回避することができる。
一般的なEventloopを利用したバックグラウンド処理がそれにあたる。
負荷の高いコードを分割して、setImmediateを活用し、IOの後で評価することができる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
'use strict';
const num = [],
start = 0,
end = 50000;

process.on('count', function(c) {
if (c > end) {
const result = num.reduce(function(prev, current, _, __) {
return prev + current;
});
console.log(result);
return;
}
num.push(c);

setImmediate(function() {
process.emit('count', ++c);
});
});

process.emit('count', start);

child_process

CPUバウンドな処理が渡されるとJavaScriptはシングルスレッドなので、WebアプリケーションだとUIがフリーズし、他の処理がキューイングされる。
そこで、CPUバウンドな処理の場合はマルチプロセス機構を使い処理することも可能。
マルチプロセスなので子プロセスは独自のメモリを持っているが、プロセス間に共有メモリはない。
また、プロセス間のメッセージングはIPCとなる。

https://github.com/nodejs/node/blob/2c84f6e75cc513fe6e958f0489d104ee883db232/lib/child_process.js#L117

起動時にモジュール、を割り当て(その他のオプションも)、プロセスを起動する。

https://github.com/nodejs/node/blob/2c84f6e75cc513fe6e958f0489d104ee883db232/lib/child_process.js#L59

起動時のメモリ割り当てが高価な点やリアルタイム性から利用できるシーンが決まってくる。

複数プロセス利用はjest-workerなどで利用されているnode-worker-farmのモジュール利用することが現在のところ良さそう。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// index.js
let workerFarm = require('worker-farm'),
process = require('process'),
workers = workerFarm(require.resolve('./child'));

let num = [];
const start = 0,
end = 100;

const child_prosess = require('os').cpus().length;

const range = (start, end, separate) => {
const result = [];
const count = (end - start) / separate;

for (let i = 0; i < separate; i++) {
let tmp = {};
if (i == 0) {
tmp = {
start,
end: count,
};
} else {
tmp = {
start: result[i - 1].end + 1,
end: result[i - 1].end + count,
}
}
result.push(tmp);
}
return result;
}

const ranges = range(start, end, child_prosess);
ranges[ranges.length - 1].end += (end % child_prosess);

for (let i = 0; i <= child_prosess; i++) {
workers(ranges[i], function (err, outp) {
if (err) {
console.error(err);
}
num = num.concat(outp)
if (i == child_prosess - 1) {
workerFarm.end(workers);
const result = num.reduce(function(prev, current, _, __) {
return prev + current;
});
console.log(result);
process.on('exit', function() {
process.exit(0);
});
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// child.js
'use strict'

const num = [];
const count = function(start, end) {
for(let i = start; i <= end; i++) {
if (num.some(function(v) { return v === i }) === false) {
num.push(i)
}
}
}

module.exports = function (inp, callback) {
if (!inp) { return }
const {start, end} = inp;
count(start, end);
console.log(`${process.pid} running`);
callback(null, num);
}

cluster

child_processと同じく、子プロセスをforkする。

https://github.com/nodejs/node/blob/2c84f6e75cc513fe6e958f0489d104ee883db232/lib/internal/cluster/master.js#L162

https://github.com/nodejs/node/blob/2c84f6e75cc513fe6e958f0489d104ee883db232/lib/internal/cluster/master.js#L102

child_processでは実行モジュールなどをオプションとして渡す必要があるが、
clusterは親プロセスと同じモジュールの先頭から実行するため不要になる。

https://github.com/nodejs/node/blob/2c84f6e75cc513fe6e958f0489d104ee883db232/lib/internal/cluster/master.js#L284

Nodejsのアプリケーション自体を並列化する。
clusterモジュールが利用できる。
マスタープロセスにリクエストが来ると、ワーカープロセスに処理を委譲する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const cluster = require('cluster');

let num = [];
const start = 0,
end = 100;

const count = (start, end) => {
const tmp = [];
for(let i = start; i <= end; i++) {
if (num.some((v) => { return v === i }) === false) {
tmp.push(i)
}
}
return tmp;
}

const range = (start, end, separate) => {
const result = [];
const count = (end - start) / separate;

for (let i = 0; i < separate; i++) {
let tmp = {};
if (i == 0) {
tmp = {
start,
end: count,
};
} else {
tmp = {
start: result[i - 1].end + 1,
end: result[i - 1].end + count,
}
}
result.push(tmp);
}
return result;
}

const child_prosess = require('os').cpus().length;

const messageReceive = () => {
return new Promise((resolve, reject) => {
let finichProcess = 0;
cluster.on('message', (worker ,message, handle) => {
finichProcess++;
num = num.concat(message)
if (finichProcess === child_prosess) {
resolve(num);
}
});
});
}

const mainFunction = () => {
const ranges = range(start, end, child_prosess);
ranges[ranges.length - 1].end += (end % child_prosess);
for (let i = 0; i < child_prosess; i++) {
let worker = cluster.fork();
worker.on('online', () => {
worker.send(ranges[i]);
});
}

const promise = messageReceive();
promise.then((num) => {
const result = num.reduce((prev, current, _, __) => {
return prev + current;
});
console.log(result);
});
}

const childProcessFunc = () => {
process.on('message', (data) => {
const response = count(data.start, data.end);
process.send(response);
cluster.worker.disconnect();
});
}

(async () => {
if (cluster.isMaster) {
mainFunction();
} else {
childProcessFunc();
}
})();

マルチスレッド

マルチプロセスを利用した場合は、プロセス生成時に起動時に大量のメモリが消費される。
そこで共有メモリを利用し、マルチプロセスより軽量なマルチスレッドを使うことになる。

https://github.com/nodejs/node/blob/master/lib/worker_threads.js

Workerコンストラクタ。実行するファイルのパスが引数。

https://github.com/nodejs/node/blob/ba4df925eb7143606d5a57f49e4ecb179dd7743b/lib/internal/worker.js#L51

isMainThreadでマインスレッドか判定。
parentPort.postMessage()メインスレッドにメッセージング。
IPCでのメッセージングではない。

https://github.com/nodejs/node/blob/master/lib/internal/worker.js#L106

メッセージの処理は以下にまとまっている

https://github.com/nodejs/node/blob/master/lib/internal/worker/io.js
https://github.com/nodejs/node/blob/8375c706ad51a399451e4f43b075f3795c440dad/src/node_messaging.cc#L802

HTML structured clone algorithmと互換性のある形式のものが転送される。
https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#Supported_types

workerData JavaScriptの値で、Workerコンストラクタに渡されたデータのクローン。

https://github.com/nodejs/node/blob/ba4df925eb7143606d5a57f49e4ecb179dd7743b/lib/internal/worker.js#L116

関連するN-APIは以下なのかな?

workser threadから非同期的にJavaScript関数を呼び出す。

workser threadが即時終了状態か

スレッドセーフ関数のキューがいっぱいになった時にブロックするか

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
'use strict';
const {
Worker,
isMainThread,
parentPort,
workerData
} = require('worker_threads');

let num = [];
let start = 0;
let end = 50000;

const count = function(start, end) {
for(let i = start; i <= end; i++) {
if (num.some(function(v) { return v === i }) === false) {
num.push(i)
}
}
}

const range = (start, end, separate) => {
const result = [];
const count = (end - start) / separate;

for (let i = 0; i < separate; i++) {
let tmp = {};
if (i == 0) {
tmp = {
start,
end: count,
};
} else {
tmp = {
start: result[i - 1].end + 1,
end: result[i - 1].end + count,
}
}
result.push(tmp);
}
return result;
}

if (isMainThread) {
const threadCount = +process.argv[2] || 2,
ranges = range(start, end, threadCount),
threads = new Set();

ranges[ranges.length - 1].end += (end % threadCount);

for (let i = 1; i < threadCount; i++) {
threads.add(new Worker(__filename, { workerData: { start: ranges[i].start, end: ranges[i].end }}));
}

threads.add(new Worker(__filename, { workerData: { start: ranges[0].start, end: ranges[0].end }}));

for (let worker of threads) {
worker.on('error', (err) => { throw err; });
worker.on('online', () => {
console.log(`Thread running start ID: ${worker.threadId} ...`);
});
worker.on('exit', () => {
console.log(`Total thread, ${threads.size} running ...`);
threads.delete(worker);
if (threads.size === 0) {
const result = num.reduce(function(prev, current, _, __) {
return prev + current;
});
console.log(result)
}
})
worker.on('message', (msg) => {
num = num.concat(msg);
});
}
} else {
count(workerData.start, workerData.end);
parentPort.postMessage(num);
}

参考ページ

Node.js v10.15.1 Documentation Worker Threads

Comments