Issei's Blog

[Node.JS] StreamAPIを使ったCSVの書き読み出し

今回は、Stream使って配列をCSVにして保存したり、CSVから配列を読み込んだりとかを業務で使ったのでそれの共有です。

今回やること

  • data.write(['a','b'])みたいに書いたらファイルにa,b\nみたいに追加されて欲しい。
  • 逆にファイルにa,b\nって書いてあったら、['a','b']みたいに1行づつの情報が欲しい。

そもそもStremAPIとは?

streamAPIの素晴らしさを知らない人のために少し解説します。 すでに使ってる人は飛ばしても大丈夫です。

公式リンク Stream API 曰く

streamはデータストリームをするための抽象的インターフェースです。

A stream is an abstract interface for working with streaming data in Node.js.

YoutTubeとかでは動画一気に読み込むとクソ重くなるからちょっとづつ読み込むようになっていますが、これがデータストリーミング。 でもこれは動画だけの話ではありません。 例えば、配列をtxtファイルに1列に保存したいとかってよくあると思います。 その時に、配列を文字列に変換して一気にファイルに保存しちゃうとデカイ配列を変換すると変換と保存に時間とメモリを食われてあまりいい手ではありません。 そこでStreamAPIです。 これを使うと、1要素ごとに保存させたりができるのですごく楽です。しかもPromiseとかコールバックは使わない点も良き 例えば以下のコードのようになります。

import * as fs from 'fs'
import * as path from 'path'

const DATA_PATH = path.join(__dirname, '../data/data.txt')
const dataWriteStream = fs.createWriteStream(DATA_PATH)

const arr = ['Hello', 'World', 'Hoge']

arr.forEach(val => dataWriteStream.write(val + '\n'))

fsはファイルの読み書き取りでよく使われるモジュールですが、createWriteStreamでファイルを書き出すstreamを作ります。そしてwrite関数にファイルの末尾に追加したい文字列を入れるだけです。

逆に読み取るときは以下になります。

import * as fs from 'fs'
import * as path from 'path'

const DATA_PATH = path.join(__dirname, '../data/data.txt')
const dataReadStream = fs.createReadStream(DATA_PATH, { encoding: 'utf-8' })

dataReadStream.on('data', chunk => console.log(chunk))

on関数の第一引数にdataを入れて1行づつ読み取り、成功した時にコールバック関数を実行します。

配列をCSVに保存する方法

さてここからが本題です。 具体的に今回作るstreamの流れは

1行分の配列のデータを渡す(['a','bb']みたいな) ↓ そのデータをCSV形式の文字列に変換 ↓ その文字列をファイルに保存

みたいな形です。 今回CSV文字列に変換するのにはcsv-stringifyというモジュールを使います。

これを実装するためにはまずWritableというクラスを継承したクラスを作ります。 そして、write関数が呼ばれた時にそのデータをcsv-stringifyに渡す。 そしてその結果をwriteStreamでファイルに追加みたいな感じです。

コードは以下になります。

import * as fs from 'fs'
import { Writable, Readable } from 'stream'
import * as stringify from 'csv-stringify'

export class WritableCSVStream extends Writable {
  private stringifier: stringify.Stringifier
  private writeStream: fs.WriteStream

  constructor(path: string, options: stringify.Options = {}) {
    super({ objectMode: true })
    this.writeStream = fs.createWriteStream(path)
    this.stringifier = stringify(options)

    this.stringifier
      .on('data', chunk => {
        this.writeStream.write(chunk)
      })
      .on('error', err => {
        this.destroy(err)
      })
    this.writeStream.on('error', err => this.destroy(err))
  }

  _write(chunk: any, encoding: string, next: (error?: Error) => void) {
    this.stringifier.write(chunk, next)
  }

  _final(next: () => void) {
    this.writeStream.close()
    this.stringifier.end()
    next()
  }

  _destroy(err?: Error, next?: (err?: Error) => void) {
    this.writeStream.close()
    this.stringifier.end()
    next()
  }
}

_writeにはwrite関数が呼ばれた時の動作、_destroyはエラー終了した時、_finalは終了時に呼ばれます。 closeend関数はストリームを終了させる役割を持ちます。 そして一番大事なのは、コンストラクターの中身です。 objectModetrueにすることでwrite関数にオブジェクトや配列を入れても大丈夫になります。 そしてオブジェクトをCSV形式に変換するstringifierとファイルに出力するwriteStreamon関数を使って繋げます。 そして_write関数でthisstringifierを繋げます。

CSVファイルの読み出し

CSVファイルの読み出しも上と同じようにもできるのですが、pipe関数でもっと簡単に書けます。 pipe関数はストリームを別のストリームにデータを受け渡しをできるようにします。

import * as parse from 'csv-parse'
export const createCSVReadStream = (
  path: string,
  options: parse.Options = {}
) => fs.createReadStream(path).pipe(parse(options))

これでfs.createReadStreamからファイル1行づつ読み出し、それをparseでオブジェクトに変換するようになってます。

終わりに

最初はCSV書き出しの方ももっと簡単に書きたかったのですが、stringify部分がうまくいかずにクラスを実装してみました。データをjsonで保存するのもstreamでできたりするのですが、暇だったらその記事も書きます。