Node.js 스트림으로 대용량 CSV 파일 처리 최적화

문제 상황

레거시 시스템에서 신규 DB로 사용자 데이터를 이관하는 작업을 맡았다. 총 1500만 건의 데이터가 담긴 120GB 크기의 CSV 파일을 파싱해야 했는데, fs.readFileSync로 전체 파일을 메모리에 올리는 순간 프로세스가 죽어버렸다.

기존 코드

const fs = require('fs');
const csv = require('csv-parser');

const data = fs.readFileSync('users.csv', 'utf-8');
// 메모리 부족으로 여기서 죽음

스트림 기반 해결

Node.js의 스트림을 사용해 chunk 단위로 읽고 처리하도록 변경했다.

const fs = require('fs');
const csv = require('csv-parser');
const { Transform } = require('stream');

const processStream = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // 데이터 변환 및 검증
    const processed = {
      id: row.user_id,
      email: row.email.toLowerCase(),
      createdAt: new Date(row.created_at)
    };
    callback(null, processed);
  }
});

let count = 0;
const batchSize = 1000;
let batch = [];

fs.createReadStream('users.csv')
  .pipe(csv())
  .pipe(processStream)
  .on('data', async (row) => {
    batch.push(row);
    if (batch.length >= batchSize) {
      processStream.pause();
      await db.insertMany(batch);
      batch = [];
      processStream.resume();
    }
    count++;
    if (count % 100000 === 0) {
      console.log(`Processed: ${count}`);
    }
  })
  .on('end', async () => {
    if (batch.length > 0) {
      await db.insertMany(batch);
    }
    console.log(`Total: ${count}`);
  });

결과

  • 메모리 사용량: 8GB → 250MB
  • 처리 시간: 약 45분 (배치 insert 병행)
  • 안정성: 중단 없이 완료

스트림의 pause()/resume()을 활용해 백프레셔를 제어한 것이 핵심이었다. DB insert가 밀릴 때 읽기를 멈춰 메모리 폭증을 방지했다.

추가 개선 사항

프로덕션에서는 진행 상황을 Redis에 저장해 중단 시 이어서 처리할 수 있도록 했다. 또한 worker_threads로 CPU 바운드 작업을 분리해 전체 처리 시간을 30분대로 단축했다.