Node.js 스트림 체이닝으로 대용량 CSV 파싱 최적화

문제 상황

데이터 분석팀에서 10GB가 넘는 CSV 파일을 DB로 마이그레이션하는 작업이 필요했다. 기존 방식은 fs.readFileSync로 전체를 읽어 처리했는데, 메모리 부족으로 프로세스가 죽는 문제가 발생했다.

Stream 기반 처리

Node.js의 Stream API를 사용해 청크 단위로 데이터를 처리하도록 변경했다.

const fs = require('fs');
const { pipeline } = require('stream');
const csvParser = require('csv-parser');
const { Transform } = require('stream');

class DBWriter extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.batch = [];
    this.batchSize = 1000;
  }

  async _transform(row, encoding, callback) {
    this.batch.push(row);
    
    if (this.batch.length >= this.batchSize) {
      await this.flush();
    }
    callback();
  }

  async _flush(callback) {
    await this.flush();
    callback();
  }

  async flush() {
    if (this.batch.length === 0) return;
    
    await db.batchInsert('users', this.batch);
    console.log(`Inserted ${this.batch.length} rows`);
    this.batch = [];
  }
}

pipeline(
  fs.createReadStream('large-data.csv'),
  csvParser(),
  new DBWriter(),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

결과

  • 메모리 사용량: 8GB → 150MB
  • 처리 시간: 45분 → 22분
  • 에러 핸들링도 pipeline의 콜백에서 일괄 처리 가능

백프레셔(backpressure)가 자동으로 처리되어 DB 쓰기 속도에 맞춰 파일 읽기 속도가 조절되는 점이 좋았다. 대용량 파일 처리가 필요할 때 Stream API는 필수다.