Node.js 스트림으로 대용량 CSV 파일 처리하기
문제 상황
고객사로부터 50GB 크기의 CSV 파일을 받아 데이터베이스에 적재해야 하는 요구사항이 들어왔다. 처음에는 fs.readFile로 전체를 메모리에 올려 처리하려 했으나, 당연히 메모리 부족으로 프로세스가 죽었다.
Stream 기반 처리
Node.js의 Stream API를 사용하면 파일을 청크 단위로 나눠서 처리할 수 있다. 메모리에는 현재 처리 중인 청크만 올라가므로 파일 크기와 무관하게 일정한 메모리로 작동한다.
const fs = require('fs');
const { pipeline } = require('stream');
const { Transform } = require('stream');
const csvParser = require('csv-parser');
class DataTransformer extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.batch = [];
this.batchSize = 1000;
}
_transform(row, encoding, callback) {
// 데이터 검증 및 변환
const transformed = {
id: row.id,
name: row.name?.trim(),
createdAt: new Date(row.created_at)
};
this.batch.push(transformed);
if (this.batch.length >= this.batchSize) {
this.push(this.batch);
this.batch = [];
}
callback();
}
_flush(callback) {
if (this.batch.length > 0) {
this.push(this.batch);
}
callback();
}
}
class DatabaseWriter extends Transform {
constructor(db, options) {
super({ ...options, objectMode: true });
this.db = db;
}
async _transform(batch, encoding, callback) {
try {
await this.db.bulkInsert('users', batch);
console.log(`Inserted ${batch.length} rows`);
callback();
} catch (error) {
callback(error);
}
}
}
// 실행
pipeline(
fs.createReadStream('./large-file.csv'),
csvParser(),
new DataTransformer(),
new DatabaseWriter(db),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
성능 개선
배치 처리를 통해 DB 쓰기 횟수를 줄였고, highWaterMark 옵션으로 버퍼 크기를 조절해 처리 속도를 높였다. 50GB 파일을 약 2시간에 처리했으며, 메모리 사용량은 200MB 이하로 유지됐다.
에러 처리는 중단된 지점부터 재개할 수 있도록 마지막 처리 ID를 파일에 기록하는 방식으로 구현했다. 이후 비슷한 대용량 파일 처리 작업에서도 동일한 패턴을 사용하고 있다.