코딩은 내일부터

효율적인 알림구현과 대규모 데이터 처리를 위한 전략 본문

카테고리 없음

효율적인 알림구현과 대규모 데이터 처리를 위한 전략

zl존 비버 2024. 3. 11. 16:21
728x90

소개

현재 진행 중인 곽두철 프로젝트(롤 과외)는 수업에 대한 효율적인 관리를 위한 다양한 기능을 제공합니다. 이번 포스팅에서는 수업 관리 시스템의 핵심 요구사항과 그 구현 과정을 포스팅하겠다.

요구사항

  1. 수업 신청 알림
  2. 수업 10분 전 알림
  3. 수업 종료 알림
  4. 수업 종료 후 수업 횟수 차감
  5. 수업 철회

@Async를 사용해 알림 처리

보내야 될 알림이 많을 때 동기적으로 처리를 하게 되면 사용자가 받아야 할 시간에 알림이 늦게 도착하는 경우가 발생한다.

예를 들어 수업 10분 전에 알림이 울려야 하는 상황에 너무 많은 알림 요청으로 알림을 전송받는 게 늦어버리게 되면

알림이라는 기능을 제대로 수행했다고 볼 수 없다.

그래서 이를 통해 응답 시간을 최소화하고, 동기적 처리로 인한 지연을 방지할 수 있다.

@Async는 어떻게 동작하나요?

Spring에서 @Async 어노테이션이 적용된 메서드를 호출할 때,

실제 메소드 호출이 아닌 프록시 객체를 통해 비동기적으로 메소드를 실행된다.

@Async을 사용한 메서드는 Thread Safe하지 않으며, 별도의 스레드에서 작업이 수행된다.

설정은 @EnableAsync 어노테이션을 사용하여 구현할 수 있고,

아무런 설정을 하지 않으면 SimpleAsyncTaskExecutor이 기본으로 사용한다.

 

기본 설정을 사용하면 각 요청이 도착할 때마다 Thread Pool의 스레드를 사용하는 대신 새로운 스레드가 생성되어 작업이 수행된다.

이는 수천 개의 요청이 들어올 경우 새로운 스레드가 수천 개 생성되어 서버 리소스가 고갈되는 문제를 발생할 수 있다.

이러한 문제를 방지하기 위해 ThreadPoolTaskExecutor 설정이 필수적이라고 생각한다.

또한, @Async를 사용할 때 프록시를 이용한다는 점에서 주의해하는데

self-invocation 문제와 public 메서드에만 적용 가능하다는 두 가지 주의사항을 염두에 두어야 한다.

ThreadPoolTaskExecutor 설정

현재 곽두철 프로젝트의 ThreadPoolTaskExecutor는 위와 같이 설정돼 있다.

위에 설정을 보면 CorePoolSize가 4이다. 이는 4보다 이상의 작업을 처리해야 할 때 MaxPoolSize만큼 스레드가 증가하고 내부적으로 Integer.MAX_VALUE(기본값) 사이즈의 LinkedBlockingQueue를 생성해서 MaxPoolSize만큼의 스레드에서 작업을 처리할 수 없을 경우 Queue에서 대기하게 된다. Queue가 꽉 차게 되면 그때 RejectedExecutionHandler를 통해서 처리된다.

 

해당 설정은 자신의 서비스의 맞게 설정하면 된다.

EX) CorePoolSize가 4, MaxPoolSize가 10, QueueCapacity가 50으로 설정된 경우

  1. 최초 4개의 스레드가 생성
  2. 4개 이상의 작업이 들어오면, 추가 작업을 위해 새로운 스레드를 생성
  3. 최대 10개의 스레드까지 생성하여 작업을 처리
  4. 그 이상의 작업이 들어오면, 작업은 큐에 저장
  5. 큐에 50개의 작업이 저장된 상태에서 추가 작업이 들어오면, RejectedExecutionHandler가 작동하여 작업을 처리하거나 거부(기본적으로 거부)

@Async를 사용해 트랜잭션 분리하기

@Async어노테이션을 사용한 또 다른 이유가 있다.

현제 수업접수 로직과 알림 로직은 관심사가 달라 결합도를 낮추기 위해 Event를 사용하고 있다.

하지만 Event를 사용하면 EventPublisher와 EventListener는 동기적으로 처리되어, 동일한 트랜잭션 범위를 공유한다.

따라서 EventPublisher에서 문제가 발생하지 않더라도 EventListener에서 동작 중 예외가 발생하면 기존 작업 전체가 롤백되고, EventListener에서 예외가 발생하면 EventPublisher는 작업이 남아있어도 예외가 발생하고 해당 요청은 중단된다.

 

이로 인해 트랜잭션은 스레드 로컬이기 때문에 Thread Safe 하지 않은 @Async를 사용하여 별도의 스레드에서 비동기적으로 동작하면 응답 시간을 최소화할 수 있다.

 

어? 그러면 @TransactionalEventListener어노테이션만 사용하면 되지 않나요?

TransactionalEventListener에서 Transaction기본설정이 AFTER_COMMIT이던데요?라고 생각할 수 있다.

 

하지만 AFTER_COMMIT은 발행자의 트랜잭션 커밋 직후 이벤트를 처리한다.

즉, 트랜잭션만 커밋하고 해당요청은 동기적으로 Listener의 동작이 모두 끝나야지만 트랜잭션을 반납한다.

그렇기 때문에 TransactionalEventListener도 EventListener와 마찬가지고 Async와 함께 사용해야

비동기적으로 요청이 실행될 수 있다.

특정시간에 작업 실행하기

위 과정으로 수업신청, 수업철회는 알림을 보낼 수 있게 되었다.

하지만 수업 10분 전 알림, 수업 종료알림, 수업종료 후 수업 횟수 차감 같은 로직은 특정시간에 로직이 실행되야 하는데 어떻게 구현해야 할까?

TaskScheduler를 사용해서 특정시간에 작업 실행하기

TaskScheduler클래스에 schedule메서드를 통해 해당 로직을 특정 시간에 실행할 수 있다.

 

TaskScheduler를 사용하면 Thread1개가 백그라운드에 대기하여 특정작업을 처리한다.

그래서 여러 작업을 schedule을 등록해도 1개의 Thread를 사용함으로 불필요한 자원을 사용하지 않는다.

하지만 작업이 여러 개 실행될 때도 Thread 1개를 사용하기 때문에 많은 작업이 있을 때 1개의 Thread를 사용하는 문제점이 있다.

이러한 문제는 다음과 같이 @Async를 사용해서 간단하게 해결할 수 있다.

서비스 재시작 시 작업 재등록하기

만약 서비스가 꺼졌다 다시 시작할 때는 해당 스케줄이 모두 없어져있을 것이다.

서비스가 시작되면 전에 있던 작업을 다시 등록해 주는 작업을 해야 한다.

@EventListener(ApplicationReadyEvent.class)를 사용해서 서비스가 시작될 때 이벤트가 실행되게 설정한다.

그리고 이때 실행되는 로직은 수업 10분 전 알림, 수업 종료알림, 수업종료 후 수업 횟수 차감로직을 스케줄에 등록하기 때문에 수업이 시작하기 전 lesson, 수업 중인 lesson을 DB에서 가져와 스케줄에 등록해 준다.

Schedule 제거하기

이제 마지막 요구사항인 수업철회를 구현해야 할 차례이다.

TaskScheduler의 schedule메서드를 사용하면 ScheduledFuture를 반환하는데

이 객체의 cancel 메서드를 사용하면 해당 스케줄을 취소할 수 있다.

이렇게 schedule을 제거하려면 이전에 수업신청 할 때 반환되는 ScheduledFuture을 Map형태로 가지고 있다가 lesson이 취소되면 해당 lesson의 아이디로 저장된 ScheduledFuture들을 모두 취소하는 방법으로 구현하면 된다.

분산된 서버환경에서 중복으로 알림이 가지 않도록 하기

만약 서버가 2대 이상 띄워져 있을 때 중복으로 알림이 전송될 수 있다.

이러한 동시성을 해결할 수 있는 방법은

MySQL의 named lock, X-lock, Redis 등등 여러 가지 방법 중 Redis를 사용해서 해결해 보기로 했다.

(우아한 테크코스에서는 기술사용의 제약이 있어 Named lock을 사용했지만 지금은 아니다!)

Redis클라이언트의 Lettuce를 사용했는데 그 이유는 더 더 가벼운 클라이언트라는 장점이 있고 재시도가 필요하지 않은 로직이기 때문에 Lettuce에 Setnx메서드를 사용해 간단하게 구현할 수 있을 거 같아 다음과 같이 setIfAbsent메서드를 사용해서 구현을 했다.

또한 TTL값도 설정할 수 있어 Duration을 사용하여 원하는 만큼 지정해 줄 수 있다(이것 또한 서비스의 맞게 설정해 주면 된다.)

현재까지 구현한 로직의 문제점

이렇게 구현한 로직이 정상적인 거 같지만 문제점이 몇 가지 있다.

첫 번째로 Schedule을 제거하기 위해 Map으로 ScheduledFuture를 저장하는 상황

두 번째로 다중 서버에서 서버가 재시작했을 때 스케줄을 등록을 하면 중복으로 스케줄이 등록되는 상황

첫 번째 문제는 ScheduledFuture을 Map에 저장하게 되면 메모리를 사용하므로 많은 스케줄이 저장했을 때

리소스 부족으로 서버에 안 좋은 영향을 끼칠 수 있고

두 번째 문제는 Lock을 사용해서 스케줄을 등록한 작업은 최종적으로 1번 실행되겠지만 중복적으로 작업을 가지고 있는 문제가 생긴다.

이러한 문제는 알림이라는 요청이 100건, 1000건일 때는 문제가 없지만 만약 10만 건, 100만 건 이상이 됐을 때 문제가 생길 것이다.

그래서 대용량 데이터 처리와 일괄처리에 용이하고 관심사를 분리하기 위해 Spring Batch를 사용해 해당 문제를 해결해 보겠다.

 

이러한 2가지의 문제의 근본적인 문제는 스케줄을 Map의 저장하고 있다는것이다.

그래서 실제 해당 로직이 문제가 있는지 테스트를 해보았다.

현재 로직이 진짜 문제가 있을까?

로직에 문제가 있는지 부하테스트와 모니터링을 해봤다.

처음에는 Heap사용량이 3.1%로 양호한 반면 점점 17.1% -> 19.7%로 증가 했다. 마지막에는 심지어 57.6%가 넘어가 버렸다.

이제 해당 로직이 문제가 있는것을 실제로 확인 했으니 더 나은 로직으로 개선해야한다.

그래프로 보는 메모리 사용량....

(채택)Scheduled를 사용해서 알림 보내기

TaskScheduler를 사용해서 알림을 보내는 방식은 메모리의 부담이가 다른 방법으로 접근해야 했다.

그래서 Lesson에 끝나는 시간을 추가해서 1분에 한번씩 Scheduled로 시작, 종료 시간과 일치하는 수업에 알림을 보내는 방법이 현재로써는 가장 적합하다고 판단되어 Scheduled를 적용하였다.

 

 

 

만약 재시도 or 복잡한 로직을 포함해야한다면?

Spring Batch를 사용해 대용량 처리에 대비하기

만약 복잡하고 재시도가 필요한 즉, 트랜잭션으로 관리를 해야한다면 Spring Batch를 사용해서 구현할 것이다.

왜냐하면 Spring Batch를 선택한 이유는 대용량 데이터 처리와 일괄 처리에 최적화되어 있고, 재시도 및 실패 처리 등등 장점이 있어 선택했다.

다른 솔루션도 고려할 수 있지만, 현재 많은 레퍼런스, JPA 지원 등등 Spring을 사용하고 있어 좀 더 친숙한 Spring Batch를 사용했다.

 

Spring Batch에서 크게 job과 step이 있는데

job은 하나의 작업단위고 스텝의 목록으로 이뤄져 있고, step은 배치 처리 작업의 최소 단위다.

예를 들어 회원가입이라는 메서드를 job, 그 메소드 내부에 유저정보 가져오기, 저장하기 등등 이러한 로직을 step들이라고 생각하면 된다.

 

Spring Batch의 병렬처리

Spring Batch에는 3가지 병렬화 방식이 있다.

  • Multi-Threaded Step
  • Parallel Steps
  • AsyncItemProcessor & AsyncItemWriter

위에 3가지 방식중 Multi-Threaded Step을 사용해서 알람기능을 병렬화 했다.

왜냐하면 

Spring Batch는 실패 지점부터의 재시도한다. 해당 기능을 사용하려면 saveState가 true인 상태로 동작하게 된다.

하지만, Multi-Threaded Step을 사용하면 해당 장점을 누릴 수 없다.

10개의 스레드가 1000개의 데이터를 100개의 Chunk로 나누어 병렬 동작한다고 가정한다면 10번째 스레드가 실패했을 경우에 1~9번의 스레드는 성공했다는 보장이 없다. 왜냐하면 1~10 순차적이 아닌, 병렬로 동작하기 때문에 saveState를 false로 지정하여 사용해야한다.

이처럼 실패의 확률이 큰 Job에서는 사용하지 않지만, 알람을 보내는 간단한 로직 특성상 실패 확률이 매우 작다고 판단하여 Multi-Threaded Step 방식을 선택하였고 다른 방식은 해당로직에 필요없어서 사용하지 않은것도 있다.

 

Spring Batch에서 지원하는 JPA사용해야할까?

Spring Batch에서는 놀랍게도 JPA를 지원한다.

그래서 처음 Job과 Step을 작성할때 JPA를 사용해서 작성했다. 하지만 전부 작성하고 나서 의문이 들었다.

현재는 Job을 실행하는 로직과 비지니스 로직이 하나의 서버에서 관리를하지만, 만약 Spring Batch의 기능이 커져 서버를 분리해야할때

Batch서버에서도 Entity를 정의를 해줘야되서(JPA를 사용하니까) 중복코드가 발생한다.

이러한 중복코드가 발생할때 도메인 서버에 Entity가 변경되면 Batch 서버에서도 변경이 일어난다.

이러한 휴먼에러, 중복을 피하기 위해 JPA를 사용하지 않고 순수 SQL 쿼리를 작성해서 해당 Batch를 실행시키는게 좋다고 판단하여 코드를 고쳤다.

 

Spring Batch Job정의하기

@Configuration
public class SendNotificationBeforeLessonJobConfig {

    private static final int CHUNK_SIZE = 10;

    private final DataSource dataSource;
    private final SendNotificationItemWriter sendNotificationItemWriter;

    public SendNotificationBeforeLessonJobConfig(
            @Qualifier("dataSource") final DataSource dataSource,
            final SendNotificationItemWriter sendNotificationItemWriter) {
        this.dataSource = dataSource;
        this.sendNotificationItemWriter = sendNotificationItemWriter;
    }

    @Bean
    public Job sendNotificationBeforeLessonJob(
            JobRepository jobRepository,
            PlatformTransactionManager transactionManager) throws Exception {
        // Job 생성 및 설정
        return new JobBuilder("sendNotificationBeforeLessonJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(sendNotificationBeforeLessonStep(jobRepository, transactionManager))
                .build();
    }

    @Bean
    public Step sendNotificationBeforeLessonStep(
            JobRepository jobRepository,
            PlatformTransactionManager transactionManager) throws Exception {
        // Step 생성 및 설정
        return new StepBuilder("sendNotificationBeforeLessonStep", jobRepository)
                .<Map<String, Object>, Letter>chunk(CHUNK_SIZE)
                .reader(sendNotificationBeforeLessonItemReader())
                .processor(sendNotificationBeforeLessonItemProcessor())
                .writer(sendNotificationItemWriter)
                .taskExecutor(executor())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Map<String, Object>> sendNotificationBeforeLessonItemReader() throws Exception {
        // ItemReader 생성 및 설정
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("startedAt", LocalDateTime.now().plusMinutes(10));

        return new JdbcPagingItemReaderBuilder<Map<String, Object>>()
                .name("sendNotificationBeforeLessonItemReader")
                .dataSource(dataSource)
                .pageSize(CHUNK_SIZE)
                .queryProvider(sendNotificationBeforeLessonQueryProvider())
                .parameterValues(parameterValues)
                .rowMapper(new ColumnMapRowMapper())
                .build();
    }

    private PagingQueryProvider sendNotificationBeforeLessonQueryProvider() throws Exception {
        // QueryProvider 생성 및 설정
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("SELECT u.device_token AS student_token, u.name AS student_name, t.name AS teacher_name, l.started_at");
        queryProvider.setFromClause("FROM lesson l JOIN users u ON l.student_id = u.id JOIN users t ON l.teacher_id = t.id");
        queryProvider.setWhereClause("WHERE l.started_at <= :startedAt");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("l.started_at", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);
        return queryProvider.getObject();
    }

    @Bean
    public ItemProcessor<Map<String, Object>, Letter> sendNotificationBeforeLessonItemProcessor() {
        // ItemProcessor 생성 및 설정
        return item -> {
            String studentToken = (String) item.get("student_token");
            String studentName = (String) item.get("student_name");
            String teacherName = (String) item.get("teacher_name");
            LocalDateTime startedAt = (LocalDateTime) item.get("started_at");
            return Letter.of(studentToken, studentName, teacherName, startedAt, BEFORE_LESSON);
        };
    }

    @Bean
    public TaskExecutor executor() {
        // TaskExecutor 생성 및 설정
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

수업 10분 전 알림 로직은 위와 같이 작성했다.

위 코드에서 봐야 할 포인트는 executor메서드와 쿼리를 사용해서 데이터를 가져온 것 이 두 부분을 설명하겠다.

 

먼저 StepBuilder에서 taskExecutor메서드 파라미터에 new SimpleAsyncTaskExecutor()를 넘겨줄 수 있지만 SimpleAsyncTaskExecutor의 단점으로는 요청의 개수만큼 스레드를 생성해서 문제였다.

그래서 executor메서드와 같이 해당 서비스에 맞도록 ThreadPoolTaskExecutor를 설정하면 된다.

 

이렇게 쿼리를 사용한다면 Entity를 갈아엎는 거 아닌 이상 코드의 변화가 최소화될 수 있을 거라 생각하여 위와 같이 구현하였다!

 

최종적으로 성능 테스트

최종적으로는 Heap 사용량이 1~2%로 안정적으로 동작하는것을 확인했다!

 

적용코드:

https://github.com/KwakDooChul/doochul-backend/pull/13

 

[Feat] 알람 기능구현 및 spring batch를 사용한 스케줄기능 구현 by ingpyo · Pull Request #13 · KwakDooChul/doo

관련 이슈 #3 #11 #12 설명 예약된수업10분전출석안내알람을 주는 기능 구현 이용권 만료 전 사용자에게 알림을 주는 기능 구현 이용권은 횟수가 모두 소진되면 만료되면 알람을 주는 기능 구현 수

github.com