Plan 9とGo言語のブログ

主にPlan 9やGo言語の日々気づいたことを書きます。

NodeJS+TypeScriptのchild_processモジュールで非同期にコマンドをパイプする

さいきん業務ではReactを使っていて、フロントに対する苦手意識も減ってきたので、いろいろ実験するための環境として自サイトをNextJSのSSGで生成するようにしたのですが、このとき、もともとはPlan 9mkfileでリソースを管理していた事情もあって、NodeJS+TypeScriptで複数の外部コマンドを扱う必要がありました。

NodeJSで非同期実行する場合、単純なコマンドならchild_processモジュールのspawnを使えばいいだけなのですが、

  • コマンドにファイル名を渡す以外の方法で標準入力にデータを流したい
  • コマンドの標準出力を、別コマンドの入力にしたい

などの場合に、どうすればいいのか分からなかったので調べました。ただ、最後にも書いているけどchild_processモジュールだけで実装するのは意外と大変なので、execaのようなライブラリを使ったほうがいいだろうと思います。

ファイルを指定して単一のコマンドを実行する

child_processモジュールにはコマンドを実行するための関数が複数ありますが、非同期で実行する場合はspawnを使います。コマンドの実行結果はStreamになっているので、以下のように書くと標準出力の内容を取得できます。

import { spawn } from "child_process";

async function main() {
    const p = await spawn("ls", ["-al"], {
        stdio: ["pipe", "pipe", "inherit"],
    });
    for await (const s of p.stdout){
        console.log(`${s}`);
    }
    const status = await new Promise((resolve, reject) => {
        p.on("close", resolve);
    });
    console.log("Status:", status);
}

main();

spawnstdioオプションは[(stdin), (stdout), (stderr)]の順になっていて、ここでpipeを与えるとプログラムから読み書きできるようになり、inheritの場合は親プロセスのファイルディスクリプタを共有します。上記コード例の場合は、stdinstdoutpipeなのでプログラムから読み書きできる状態でプロセスを実行しますし、stderrのところがinheritなので、コマンドのエラーは標準エラー出力にそのまま流れます。ただし、ここで実行しているlsコマンドは標準入力を読まないので、stdinpipeになっている意味はありません。

標準エラー出力を読む場合は、stdioオプションをpipeに設定して、p.stdoutの代わりにp.stderrを読めば得られます。

任意のストリームをコマンドに渡したい

コマンドの扱う入力がファイルで足りる場合はspawnだけで良いのですが、コマンドに流したいデータは必ずしもファイル名で与えられるとは限りません。例えばHTTPで取得したレスポンスかもしれないし、別のコマンドを実行した結果かもしれません。具体的には、

createReadStream("file").pipe(spawn("sed", ...)).pipe(spawn("wc", ...))

のようにStreamを繋げたい場合は、意外と素直に書くことができませんでした。spawnstdioオプションはStreamを受け取れるので、

import { spawn } from "child_process";
import { createReadStream } from "fs";

async function main() {
    const fin = createReadStream("package.json", "utf-8");
    const p = await spawn("wc", ["-l"], {
        stdio: [fin, "pipe", "inherit"],
    });
    ...
}

とすれば実現できるかというと、このコードはERR_INVALID_ARG_VALUEで失敗します。

$ npx ts-node main.ts 
TypeError [ERR_INVALID_ARG_VALUE]: The argument 'stdio' is invalid. Received ReadStream {
  path: 'package.json',
  flags: 'r',
  mode: 438,
  fd: null,
  start: undefined,
  end: Infinity,
  pos: undefine...
    at new NodeError (node:internal/errors:371:5)
    at node:internal/child_process:1042:13
    at Array.reduce (<anonymous>)
    at getValidStdio (node:internal/child_process:967:11)
    at ChildProcess.spawn (node:internal/child_process:354:11)
    at Object.spawn (node:child_process:698:9)
    at main (/home/lufia/Downloads/x/main.ts:6:18)
    at Object.<anonymous> (/home/lufia/Downloads/x/main.ts:18:1)
    at Module._compile (node:internal/modules/cjs/loader:1101:14)
    at Module.m._compile (/home/lufia/Downloads/x/node_modules/ts-node/src/index.ts:1311:23) {
  code: 'ERR_INVALID_ARG_VALUE'
}

どうやらcreateReadStreamしただけではfinが内部に持っているファイルディスクリプタがまだnullのようです。fin.on("data", ...)すればファイルディスクリプタがセットされそうでしたが、それだと本当はコマンドに流したいデータがコールバック関数に流れてしまうので、意図していた動作にはなりません。

Transformを実装してパイプする

Stramには、読み込み用のReadableや書き込み用のWritable以外にも、ストリームを流れるデータを加工して流すためのTransformが用意されています。上でやりたかったコマンドのパイプも、このTransformを実装すると実現できます。Transformの実装方法は、コンストラクタに変換するための関数を渡す方法と、クラスとして実装する方法の2種類ありますが、ここでは前者で実装しました。

import { ChildProcess, spawn } from "child_process";
import { createReadStream } from "fs";
import { Transform } from "stream";

function createTransform(p: ChildProcess): Transform {
    const data: string[] = [];
    p.stdout?.on("data", s => {
        data.push(s); // 実行したコマンドの標準出力を貯めておく
    });

    const t = new Transform({
        // 定期的に呼ばれるので貯めていたデータを流す
        transform: (s, encoding, callback): void => {
            // 前プロセスから届いたデータをコマンドに流す
            p.stdin?.write(s);

            // コマンドの結果を次のストリームに渡す
            while(data.length > 0)
                t.push(data.shift());
            callback();
        },
        final: async (callback): Promise<void> => {
            // コマンドが終わるのを待つ
            p.stdin?.end();
            const status = await new Promise((resolve, reject) => {
                p.on("close", resolve);
            });
            if(status !== 0)
                throw new Error(`${p.spawnfile}: exit with ${status}`);

            // p.stdin.end()の後で貯まったデータを次のストリームへ全部流す
            while(data.length > 0)
                t.push(data.shift());
            callback();
        },
    });
    return t;
}

// 使用例
async function main() {
    const fin = createReadStream("package.json", "utf-8");
    const p = spawn("grep", ["ts"], {
        stdio: ["pipe", "pipe", "inherit"],
    });
    const t = createTransform(p);
    const stream = fin.pipe(t);
    try {
        for await (const s of stream){
            console.log(`${s}`);
        }
    } catch(e) {
        console.error(e.message)
    }
}

pipeではなくpipelineを使う

ところで、pipeでストリームを接続した場合、ストリームのうちどれか1つでもエラーが発生すると、残りのプロセスが回収されない問題があります。イベントを監視して適切な対応をすればいいのですが、接続するプロセスが多くなると面倒なので、今はpipelineを使うとよいそうです。

ただし、pipeと異なりpipelineは戻り値がPromise<void>になっているため、これまでのようにfor...ofを使った出力結果の取得ができません。代わりにpipelineでは、最後の引数にWritableなストリームを受け取るようになっていて、例えばファイルに書き出す場合はここへcreateWriteStreamで生成したストリームを渡します。

とはいえ、ここではコマンドの実行結果をプログラムから扱いたいだけなので、ファイルに書き出す必要はありません。流れてきたデータをメモリ上で保持するためのWritableなストリームを探したのですが見つからなかった*1ので、以下のようにWritableMemoryStreamを用意して対応しました。

import { ChildProcess, spawn } from "child_process";
import { createReadStream } from "fs";
import { Transform, Writable } from "stream";
import { pipeline } from "stream/promises";

async function main() {
    const fin = createReadStream("package.json", "utf-8");
    const grep = spawn("grep", ["ts"], {
        stdio: ["pipe", "pipe", "inherit"],
    });
    const wc = spawn("wc", ["-l"], {
        stdio: ["pipe", "pipe", "inherit"],
    });
    const w = new WritableMemoryStream();
    try {
        await pipeline(fin, createTransform(grep), createTransform(wc), w);
    } catch(e) {
        console.error(e.message)
    }
    console.log(w.toString());
}

function createTransform(p: ChildProcess): Transform {
    const data: string[] = [];
    p.stdout?.on("data", s => {
        data.push(s);
    });

    const t = new Transform({
        transform: (s, encoding, callback): void => {
            p.stdin?.write(s);
            while(data.length > 0)
                t.push(data.shift());
            callback();
        },
        final: async (callback): Promise<void> => {
            p.stdin?.end();
            const status = await new Promise((resolve, reject) => {
                p.on("close", resolve);
            });
            if(status !== 0)
                throw new Error(`${p.spawnfile}: exit with ${status}`);
            while(data.length > 0)
                t.push(data.shift());
            callback();
        },
    });
    return t;
}

class WritableMemoryStream extends Writable {
    private data: string[];

    constructor() {
        super();
        this.data = [];
    }

    _write(data: any, encoding: BufferEncoding, callback: (error?: Error | null) => void) {
        this.data.push(data);
        callback();
    }

    toString(): string {
        return this.data.join("");
    }
}

Transformと同じように、Writableのコンストラクタを使ってもよかったけれど、toStringで最終的な結果を取得する方が自然に思えたのでクラスとして実装しています。

感想

パイプの実装なんてCで何度も書いたことがあるし、spawnインターフェイスがやりたいことを満たしてそうだったのもあって、油断してchild_processだけで対応してしまったけれど、やってみると意外と大変でした。もっとうまい実装方法はあるかなとは思いつつ、個人的には、次に同じ処理が必要ならexecaのようなライブラリを使うと思います。

*1:npmにmemorystreamはあるけどメンテナンスされてなさそうだった