Java/Spring

Spring Batch, Migration, 튜닝 및 OOM 해결 후기

체리필터 2019. 8. 12. 13:43
728x90
반응형

회사에서 진행하는 새로운 프로젝트를 위해서 회원 정보 1,000만건에 가까운 데이터를 마이그레이션 해야 하는 작업이 필요 했다.

회원 정보만 1,000만건이고 기타 회원이 가지고 있는 부가적인 정보들을 포함하자면 거의 1억건에 가까운 데이터로 보여졌는데 이를 어떻게 마이그레이션 해야 하나 고민이 깊었다.

기존 레거시 시스템에서 새로운 시스템으로 이전을 해야 해서 내부적으로 새로운 데이터 베이스 시스템에 맞게 구조를 변경하고 적절한 로직을 가미해서 옮겨야 했기 때문에 단순 데이터 이전은 아니였다.

할줄 아는 거라고는 별로 없으니 Spring Batch를 사용할 수 밖에...

 

한동안 JPA를 편하게 써 와서  JPA를 사용하고 싶었지만...

Bulk Insert가 없어서 저 많은 Insert문을 날리느니 안하느니만 못하여 오랜만에 MyBatis를 이용하게 되었다.

MyBatis를 사용하여 로직을 구성하고 돌려보니 회원정보 기준 200만건이 넘어가면서부터 슬슬 느려지기 시작하더니 230만건 쯤에 OOM을 내면서 배치 시스템이 죽었다.

 

결론적으로 보자면 느려진 것의 원인과 OOM의 원인이 조금 달라 따로 따로 기록해 둔다.

우선 배치와 관련된 기본적인 정보는 아래의 블로그를 참고하였다.

 

https://jojoldu.tistory.com/324

 

1. Spring Batch 가이드 - 배치 어플리케이션이란?

Spring Batch In Action이 2011년 이후 개정판이 나오지도 않고, 한글 번역판도 없고, 국내 Spring Batch 글 대부분이 튜토리얼이거나 공식 문서 중 일부분을 짧게 번역한 내용들이라 대용량 시스템에서 사용할때..

jojoldu.tistory.com

위에서 배치 가이드를 시리즈 별로 2, 3, 4... 죽 이거가면서 보면 어느정도 배치에 대해 알게 될 수 있다.

나도 뭐 많이 해 보지 않고 이제 2개의 배치만 만들어 봤기에 제대로 만든 것인지도 모르겠다.

 

우선 application.yml 의 내용이다.

spring:
    profiles:
      active: local
    application:
      name: ferrari

spring.batch.job.names: ${job.name:NONE}

job.name:NONE은 job name이 파라미터로 들어오지 않을 경우 실행하지 않는다는 의미이며, 위에 참고한 블로그네 나와 있다.

 

다음은 ativie local을 반영한 application-local.yml 의 환경 구성 파일이다.

spring:
    profiles: local
    domain: localhost
    datasource-user:
        datasource-write:
            driverClassName: com.mysql.cj.jdbc.Driver
            jdbcUrl: jdbc:mysql://dburl:3306/yeogi_user?autoReconnect=true&useSSL=false
            username: id
            password: password
            maximumPoolSize: 30
            minimumIdle: 5
            poolName: write
            readOnly: false
        datasource-read:
            driverClassName: com.mysql.cj.jdbc.Driver
            jdbcUrl: jdbc:mysql://dburl:3306/yeogi_user?autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
            username: id
            password: password
            maximumPoolSize: 30
            minimumIdle: 5
            poolName: read
            readOnly: true
        legacy-datasource-write:
            driverClassName: com.mysql.cj.jdbc.Driver
            jdbcUrl: jdbc:mysql://dburl:3306/yeogi?autoReconnect=true&useSSL=false
            username: id
            password: password
            maximumPoolSize: 30
            minimumIdle: 5
            poolName: write
            readOnly: false
        legacy-datasource-read:
            driverClassName: com.mysql.cj.jdbc.Driver
            jdbcUrl: jdbc:mysql://dburl:3306/yeogi?autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
            username: id
            password: password
            maximumPoolSize: 30
            minimumIdle: 5
            poolName: read
            readOnly: true
logging:
  level:
    kr.co.within.batch.ferrari: DEBUG
    jpa:
        show-sql: true
        hibernate:
            ddl-auto: none
        properties:
            hibernate:
                show_sql: true
                use_sql_comments: true
                format_sql: true
                type: trace
                dialect: org.hibernate.dialect.MySQL55Dialect
                hbm2ddl:
                  auto: none

 

다음은 위 환경 구성을 반영한 infrastructure 단의 ReplicationRoutingDataSource.java, UserDataSourceConfig.java 소스이다.

import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Slf4j
public class ReplicationRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        String dataSourceType = TransactionSynchronizationManager.isCurrentTransactionReadOnly() ? "read" : "write";
        log.info("### batch current dataSourceType : {}", dataSourceType);
        return dataSourceType;
    }
}
import kr.co.within.batch.ferrari.infrastructure.config.ReplicationRoutingDataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = "kr.co.within.batch.ferrari.domain.user.repository", sqlSessionFactoryRef = "userSqlSessionFactory")
public class UserDataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource-user.datasource-read")
    public DataSource readUserDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource-user.datasource-write")
    public DataSource writeUserDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    public DataSource routingUserDataSource(@Qualifier("writeUserDataSource") DataSource writeUserDataSource, @Qualifier("readUserDataSource") DataSource readUserDataSource) {
        ReplicationRoutingDataSource routingDataSource = new ReplicationRoutingDataSource();

        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put("write", writeUserDataSource);
        dataSourceMap.put("read", readUserDataSource);
        routingDataSource.setTargetDataSources(dataSourceMap);
        routingDataSource.setDefaultTargetDataSource(readUserDataSource());

        return routingDataSource;
    }

    @Bean
    public DataSource userDataSource(@Qualifier("routingUserDataSource") DataSource routingUserDataSource) {
        return new LazyConnectionDataSourceProxy(routingUserDataSource);
    }


    @Bean(name = "userSqlSessionFactory")
    public SqlSessionFactory userSqlSessionFactory(@Qualifier("userDataSource") DataSource userDataSource,
                                                   ApplicationContext applicationContext)
            throws Exception {
        SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
        factory.setDataSource(userDataSource);
        factory.setVfs(SpringBootVFS.class);
        factory.setTypeAliasesPackage("kr.co.within.batch.ferrari.domain.user");
        factory.setMapperLocations(applicationContext.getResources("classpath*:mapper/user/*Mapper.xml"));
        factory.setConfigurationProperties(getMyBatisProperties());
        return factory.getObject();
    }

    private Properties getMyBatisProperties() {
        Properties properties = new Properties();
        properties.put("cacheEnabled", false);
        properties.put("lazyLoadingEnabled", true);
        properties.put("localCacheScope", "STATEMENT");
        properties.put("defaultExecutorType", "BATCH");
        return properties;
    }

    @Bean(name = "userSqlSession", destroyMethod = "clearCache")
    public SqlSessionTemplate userSqlSessionTemplate(@Qualifier("userSqlSessionFactory") SqlSessionFactory userSessionFactory) {
        return new SqlSessionTemplate(userSessionFactory);
    }

    @Bean(name = "userTransactionManager")
    public DataSourceTransactionManager userTransactionManager(@Qualifier("userDataSource") DataSource userDataSource) {
        return new DataSourceTransactionManager(userDataSource);
    }
}

 

실제 배치 소스

@Configuration
public class Migration5BatchConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step0CommonTasklet step0CommonTasklet() {
        return new Step0CommonTasklet();
    }

    @Bean
    public Step1UserTasklet step1UserTasklet() {
        return new Step1UserTasklet();
    }
    
    ...

    @Bean
    public Job migration5JobForUser(Step step0Common, Step step1User) {
        return jobBuilderFactory.get("migration5JobForUser")
                .flow(step0Common)
                .next(step1User)
                .end()
                .build();
    }

    @Bean
    public Job migration5ForDevice(Step step2Device) {
        return jobBuilderFactory.get("migration5ForDevice")
                .flow(step2Device)
                .end()
                .build();
    }
    
    ...

    @Bean
    public Job migration5ForNicknameHistory(Step step9NicknameHistory, Step step10NicknamePool) {
        return jobBuilderFactory.get("migration5ForNicknameHistory")
                .flow(step9NicknameHistory)
                .next(step10NicknamePool)
                .end()
                .build();
    }

    /**
     * 공통 정보 미리 셋팅
     * @param step0CommonTasklet
     * @return
     */
    @Bean
    public Step step0Common(Step0CommonTasklet step0CommonTasklet) {
        return stepBuilderFactory.get("step0Common")
                .tasklet(step0CommonTasklet)
                .build();
    }

    /**
     * Step1 회원 정보 이관
     * @param step1UserTasklet
     * @return
     */
    @Bean
    public Step step1User(Step1UserTasklet step1UserTasklet) {
        return stepBuilderFactory.get("step1User")
                .tasklet(step1UserTasklet)
                .build();
    }
    
    ...
}
@Slf4j
@Component
public class Migration5NotificationListener extends JobExecutionListenerSupport {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("### beforeJob() ###");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("### afterJob() ###");

        if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
            log.info("Batch Job Completed.");
        }
        else if(jobExecution.getStatus() == BatchStatus.FAILED){
            log.error("Batch Job Failed.");

        }
    }
}

 

각 Tasklet

@Slf4j
public class Step0CommonTasklet implements Tasklet {
    @Autowired
    private Migration5Service migration5Service;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        log.info("=================== Step0CommonTasklet Start ===================");

        migration5Service.makeAgreementMeta();

        migration5Service.makeGroupMeta();

        log.info("=================== Step0CommonTasklet End ===================");
        return RepeatStatus.FINISHED;
    }
}
@Slf4j
public class Step1UserTasklet implements Tasklet {
    @Autowired
    private Migration5Service migration5Service;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        log.info("=================== Step1UserTasklet Start ===================");

        // TODO 1. 유저 마이그레이션
        migration5Service.migrationUser();

        log.info("=================== Step1UserTasklet End ===================");
        return RepeatStatus.FINISHED;
    }
}

 

위와 같이 해 둔 상태에서 각 Job을 실행하게 되면 OOM이 발생 하게 되었다.

증상은 위에서 설명 했듯이 200만건이 넘게 되면 점점 느려지게 되었고 230만건 정도에 어김없이 OOM이 발생하게 되었다.

일단 느려지게 된 이유는 MySQL에서 Limit의 사용이 원인이였다.

limit를 사용하게 될 경우 페이지가 뒤로 갈 수록 느려지게 되는데 이게 200만건 부터 기하 급수적으로 느려지기 시작했기에 이대로라면 마이그레이션을 할 수 없는 수준이였다.

따라서 앱에서 무한스크롤로 다음 페이지를 가져오는 방식과 비슷하게 Primary Key를 가지고 페이징을 하게 되었다.

가령 100번까지 가지고 왔다면 다음 페이지는 Primary Key가 100보다 크면서 1,000개 가지고 오기 식으로...

 

이렇게 해서 속도문제는 개선이 되었지만 여전히 OOM이 발생하였다.

실제 로직을 구현하는 소스 단에서 다음과 같은 부분을 튜닝 하였다.

    private List<UserVo> userVoList = new ArrayList<>();
    private List<FriendRecommendHistoryVo> friendRecommendHistoryVoList = new ArrayList<>();
    private List<AgreementInstanceVo> agreementInstanceVoList = new ArrayList<>();
    private List<GroupInstanceVo> groupInstanceVoList = new ArrayList<>();
    private List<BlockVo> blockVoList = new ArrayList<>();

    /**
     * 회원정보 마이그레이션, 친구 추천 목록까지 이관
     */
    public void migrationUser() {
        int page = 1;
        long start = 0;
        
        ...
        
        while (true) {
            legacyCommonPagingParam.setPage(page);
            legacyCommonPagingParam.setStart(start);
            legacyCommonPagingParam.setOffset(5000);

            log.info("======================= start ======================= : {}", start);
            List<LegacyUserVo> legacyUserVoList = legacyUserService.selectMigrationTargetUser(legacyCommonPagingParam);

            userVoList.clear();
            friendRecommendHistoryVoList.clear();
            agreementInstanceVoList.clear();
            groupInstanceVoList.clear();
            blockVoList.clear();

            for ( LegacyUserVo legacyUserVo:legacyUserVoList) {
                // TODO. 각 비즈니스 로직 처리...

                start = legacyUserVo.getUno();
            }
            legacyCommonPagingParam.setStart(start);

            // 회원 목록과 친구추천 이력을 저장
            userService.bulkInsert(userVoList);
            userCount += userVoList.size();

            log.info("Heap : {}", ManagementFactory.getMemoryMXBean().getHeapMemoryUsage());
            log.info("NonHeap : {}", ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());

            page++;
            if(Optional.ofNullable(legacyUserVoList).orElse(new ArrayList<>()).size() == 0) {
                break;
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }

        snsAgreementMetaVo = null;
        pushAgreementMetaVo = null;
        recentAdAgreementMetaVo = null;
        groupMetaVoList = null;
        groupMetaVoMap = null;
    }

유의해 보아야 할 부분은 멤버 변수로 마이그레이션 해야 할 내용들을 빼 놓고 재사용한 점이다.

보통의 경우 instance를 재생성 하게 되는 비용이 크기 때문에 위에 처럼 만들어 놓고 한번 사용 했다면 clear로 메모리를 비워 줬다.

메모리를 null 처리 하는 경우에는 메모리 주소까지 없애는 것이기에 loop를 돌 때마다 instance를 새롭게 생성해야 하지만 clear로 비워주는 경우에는 instance 생성 비용을 아낄 수 있게 된다.

 

이러한 아이디어는 꽤 괜찮아 보이긴 했는데 실제 이 부분이 원인은 아니였다.

역시나 비슷한 부분에서 OOM이 발생 하였다.

그래서 사용하게 된 MyBatis의 문제는 아닐까란 생각에 해당 부분을 검색하고 찾아 보았다.

 

그래서 Service 부분에서 MyBatis를 사용하는 부분에 다음과 같이 수정 하였다.

@Slf4j
@Service
public class UserService {
    @Autowired
    private UserQueryMapper userQueryMapper;

    @Autowired
    private DormancyUserQueryMapper dormancyUserQueryMapper;

    public void bulkInsert(List<UserVo> userVoList) {
        if(userVoList.size() == 0) {
            return;
        }

        userQueryMapper.bulkInsert(userVoList);
        userQueryMapper.flush();
    }
    
    ...
}

userQueryMapper.flush()를 통해 MyBatis 내부적으로 가지고 있는 캐쉬 데이터를 플러싱 해 주었다.

또한 SQL문을 작성한 xml문에도 다음과 같이 해 주었다.

 

    <insert id="bulkInsert" parameterType="list" flushCache="true">
        INSERT INTO `User`
          (
            ...
          )
        VALUES
        <foreach collection="list" item="userVo" separator=",">
          (
            ...
          )
        </foreach>
    </insert>

flushCache="true" 부분을 추가해 주었다.

 

이렇게 해 둔 상태로 마이그레이션을 돌리니 1,000만건 정도 되는 회원 데이터가 큰 무리없이 잘 마이그레이션 됨을 볼 수 있었다.

 

결과론적으로 보자면 크게 어려운 부분은 없지만, 이 내용을 찾고 알게 되기까지 많은 삽질이 있었다.

추후 다른 분들은 삽질하지 않도록 글로 남겨 둔다.

 

 

 

 

 

 

 

 

 

 

728x90
반응형