Node.js 스트림으로 대용량 CSV 파일 처리 최적화
문제 상황
레거시 시스템에서 신규 DB로 사용자 데이터를 이관하는 작업을 맡았다. 총 1500만 건의 데이터가 담긴 120GB 크기의 CSV 파일을 파싱해야 했는데, fs.readFileSync로 전체 파일을 메모리에 올리는 순간 프로세스가 죽어버렸다.
기존 코드
const fs = require('fs');
const csv = require('csv-parser');
const data = fs.readFileSync('users.csv', 'utf-8');
// 메모리 부족으로 여기서 죽음
스트림 기반 해결
Node.js의 스트림을 사용해 chunk 단위로 읽고 처리하도록 변경했다.
const fs = require('fs');
const csv = require('csv-parser');
const { Transform } = require('stream');
const processStream = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// 데이터 변환 및 검증
const processed = {
id: row.user_id,
email: row.email.toLowerCase(),
createdAt: new Date(row.created_at)
};
callback(null, processed);
}
});
let count = 0;
const batchSize = 1000;
let batch = [];
fs.createReadStream('users.csv')
.pipe(csv())
.pipe(processStream)
.on('data', async (row) => {
batch.push(row);
if (batch.length >= batchSize) {
processStream.pause();
await db.insertMany(batch);
batch = [];
processStream.resume();
}
count++;
if (count % 100000 === 0) {
console.log(`Processed: ${count}`);
}
})
.on('end', async () => {
if (batch.length > 0) {
await db.insertMany(batch);
}
console.log(`Total: ${count}`);
});
결과
- 메모리 사용량: 8GB → 250MB
- 처리 시간: 약 45분 (배치 insert 병행)
- 안정성: 중단 없이 완료
스트림의 pause()/resume()을 활용해 백프레셔를 제어한 것이 핵심이었다. DB insert가 밀릴 때 읽기를 멈춰 메모리 폭증을 방지했다.
추가 개선 사항
프로덕션에서는 진행 상황을 Redis에 저장해 중단 시 이어서 처리할 수 있도록 했다. 또한 worker_threads로 CPU 바운드 작업을 분리해 전체 처리 시간을 30분대로 단축했다.