Elasticsearch 에 RDB의 데이터를 마이그레이션 하기 위한 과정이다.

<aside> 📝 마이그레이션을 위한 예상 플로우는 아래와 같다.

  1. RDB에서 ES에 적재할 테이블 데이터를 JSON으로 추출한다.
  2. 추출한 데이터를 ES api bulk 요구 타입으로 변환한다.
  3. 변환한 데이터를 api를 통해 ES에 적재한다.

</aside>

RDB에서 추출한 데이터를 변환하고 ES에 적재하는 과정을 자동화 하기 위해 Bulk 스크립트를 만들기로 하였다.

이 밖에도 Json 파일을 stream으로 읽어오면서 발생한 문제와 ES API를 통해 bulk하는 과정에서 body에 담긴 데이터의 사이즈가 커서 발생한 문제들이 더 있었는데 해결 과정을 메모해두지 않아서 코드에서 주석을 통해 정리해보려 한다.

convert-es-bulk.ts

// # convert-es-bulk.ts

// 스크립트를 실행할때의 인자값을 통해 JSON 파일 경로를 받아온다.
const json_file = process.argv[2];

require('dotenv').config();
const fs = require('fs');
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');
const { Client } = require('@elastic/elasticsearch');

// ES api 를 사용하기 위해 client init
const client = new Client({
  node: process.env.ELASTIC_HOST,
  auth: {
    username: process.env.ELASTIC_USERNAME,
    password: process.env.ELASTIC_PASSWORD,
  },
  tls: {
    ca: process.env.ELASTIC_TLS_CRT,
    rejectUnauthorized: false,
  },
});

// ES bulk 할때 한번에 보낼 데이터 raw 사이즈
// RDB에서 1개의 raw지만 ES bulk를 위한 json 양식의 경우 
// 해당 raw 위쪽에 index를 위한 raw 를 추가해줘야함. 그렇기 때문에, 처리를 위한 size를
// 10만개로 지정을 하여도 실제로 ES에 bulk로 적재되는 raw는 1/2인 5만개로 생각해야함.
// 해당 사이즈는 인프라 환경에 따라 변경이 필요
// 일정 사이즈를 넘기게 되면 ES bulk api request에서 413에러를 뱉을것임.
const splice_size = 100000;

// 변환된 json raw가 담길 배열
const es_json = [];

// 변환과정에서 소요된 시간을 측정하기 위한 변수
const delay = Date.now();

// 총 변환한 json raw의 갯수를 확인하기 위한 변수
let json_file_count = 0;

const stream = fs.createReadStream(json_file);
stream
  .pipe(parser())
  .pipe(streamArray())
  .on('data', (d) => processData(d.value))
  .on('end', () => endProcess());

// Stream 읽기가 끝나면 변환을 위해 소요된 시간과 변환된 raw의 갯수를 확인하기 위한 로깅
// 그리고 ES Bulk를 시작함.
async function endProcess() {
  console.log(
    `End convert process json file to es bulk json type (+${
      Date.now() - delay
    }ms)`,
  );
  console.log(
    `Json raw count : ${json_file_count}, ES bulk json raw count : ${es_json.length}`,
  );
  await esBulk();
}

// Stream을 읽으며 es bulk를 위한 json 데이터로 변환하는 과정
function processData(data) {
  json_file_count++;

  es_json.push({
    index: {
      _index: 'board_community',
      _id: data.board_id.toString(),
    },
  });
  // RDB 테이블의 어떤 컬럼들을 적재할지 이곳에서 지정해주면 된다. RDB의 컬럼 이름과
  // ES에 필드 네임이 다를경우에도 아래에서 변경해줄수 있음.
  es_json.push({
    board_id: data.board_id,
    board_title: data.board_title,
    board_contents: data.board_contents,
    board_type: data.board_type,
    user_name: data.user_name,
  });
}

// 
async function esBulk() {
  const bulk_count = Math.ceil(es_json.length / splice_size);

  for (let i = 0; i < bulk_count; i++) {
    const response = await client.bulk({
      body: i + 1 != bulk_count ? es_json.splice(0, splice_size) : es_json,
    });
    console.log(
      `[${i + 1}] ES bulk commit raw size : ${response.items.length}`,
    );
  }
}
$ npm run elastic:bulk ../board.json 

> [email protected] elastic:bulk                                             
> node --max-old-space-size=16000 elk/tools/convert-es-bulk.ts ../board.json

End convert process json file to es bulk json type (+37943ms)
Json raw count : 2459243, ES bulk json raw count : 4918486
[1] ES bulk commit raw size : 50000
[2] ES bulk commit raw size : 50000
[3] ES bulk commit raw size : 50000
[4] ES bulk commit raw size : 50000
[5] ES bulk commit raw size : 50000
[6] ES bulk commit raw size : 50000
[7] ES bulk commit raw size : 50000
[8] ES bulk commit raw size : 50000
[9] ES bulk commit raw size : 50000
[10] ES bulk commit raw size : 50000
[11] ES bulk commit raw size : 50000
[12] ES bulk commit raw size : 50000
[13] ES bulk commit raw size : 50000
[14] ES bulk commit raw size : 50000
[15] ES bulk commit raw size : 50000
[16] ES bulk commit raw size : 50000
[17] ES bulk commit raw size : 50000
[18] ES bulk commit raw size : 50000
[19] ES bulk commit raw size : 50000
[20] ES bulk commit raw size : 50000
[21] ES bulk commit raw size : 50000
[22] ES bulk commit raw size : 50000
[23] ES bulk commit raw size : 50000
[24] ES bulk commit raw size : 50000
[25] ES bulk commit raw size : 50000
[26] ES bulk commit raw size : 50000
[27] ES bulk commit raw size : 50000
[28] ES bulk commit raw size : 50000
[29] ES bulk commit raw size : 50000
[30] ES bulk commit raw size : 50000
[31] ES bulk commit raw size : 50000
[32] ES bulk commit raw size : 50000
[33] ES bulk commit raw size : 50000
[34] ES bulk commit raw size : 50000
[35] ES bulk commit raw size : 50000
[36] ES bulk commit raw size : 50000
[37] ES bulk commit raw size : 50000
[41] ES bulk commit raw size : 50000
[42] ES bulk commit raw size : 50000
[43] ES bulk commit raw size : 50000
[44] ES bulk commit raw size : 50000
[45] ES bulk commit raw size : 50000
[46] ES bulk commit raw size : 50000
[47] ES bulk commit raw size : 50000
[48] ES bulk commit raw size : 50000
[49] ES bulk commit raw size : 50000
[50] ES bulk commit raw size : 9243