Spring Batch에서 JpaPagingItemReader와 JpaCursorItemReader의 이해
Spring Batch는 대용량 데이터 처리에 특화된 프레임워크로, 다양한 데이터 읽기 방식을 제공합니다.
특히, JpaPagingItemReader
와 JpaCursorItemReader
는 JPA를 사용하여 데이터베이스에서 데이터를 읽는 두 가지 주요 방법입니다.
JpaPagingItemReader
개념
JpaPagingItemReader
는 페이징 기법을 사용하여 데이터를 페이지 단위로 나누어 읽습니다.
이 방식은 한 번에 모든 데이터를 로드하는 대신, 설정된 페이지 크기에 따라 작은 데이터 집합을 순차적으로 처리합니다.
장점
- 메모리 효율성: 대용량 데이터를 메모리에 한 번에 로딩하지 않고, 페이지 단위로 나누어 처리하기 때문에 메모리 사용을 최적화 할 수 있습니다.
- 병렬 처리 용이: 데이터를 페이지로 분할하여 처리하므로, 병렬 처리를 통한 성능 향상이 가능합니다.
단점
- 성능 오버헤드: 각 페이지를 로드할 때마다 새로운 쿼리가 실행되므로, 데이터베이스와의 네트워크 통신 비용이 증가할 수 있습니다.
- 데이터 무결성 및 일관성 부합 가능: 각 페이지를 로드할 때마다 새로운 쿼리가 실행되므로 중간에 데이터 추가/변경시 데이터 무결성 및 일관성에 부합하게 될 수 있습니다. 따라서 DB의 트랜잭션 격리 수준, 특정 시점, 상태로 조회 등의 추가적인 고려가 필요합니다.
적합한 사용
대용량 데이터 처리가 필요하고 메모리 사용량을 제어해야 할 때, 또는 병렬 처리를 통해 성능을 향상시키고자 할 때 유리합니다.
사용 예시
@Bean
public JpaPagingItemReader<MyEntity> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
JpaPagingItemReader<MyEntity> reader = new JpaPagingItemReader<>();
reader.setQueryString("SELECT m FROM MyEntity m");
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(100); // 페이지 크기 설정
return reader;
}
JpaCursorItemReader
JpaCursorItemReader
는 데이터베이스의 커서를 사용하여 데이터를 순차적으로 읽어옵니다.
커서는 조회된 데이터 집합에 대한 포인터 역할을 하며, 데이터를 한 번에 한 행씩 처리합니다.
장점
- 실시간 처리: 데이터를 순차적으로 처리할 수 있어, 처리 지연 시간을 최소화 할 수 있습니다.
- 간단한 구현: 커서 기반 처리는 복잡한 페이징 로직 없이 구현할 수 있습니다.
단점
- 트랜잭션 관리 주의: 데이터베이스 연결이 계속 유지되어야 하므로, 트랜잭션 관리가 중요합니다.
- 메모리 관리: 대량의 데이터를 처리할 때, 메모리 부족 문제가 발생할 수 있습니다.
적합한 사용
실시간 데이터 처리가 필요하거나, 순차적인 데이터 처리가 중요한 경우에 적합합니다.
특히, 트랜잭션 관리에 능숙하고, 처리해야 할 데이터의 양이 메모리 용량을 초과하지 않을 때 유리합니다.
사용 예시
@Bean
public JpaCursorItemReader<MyEntity> jpaCursorItemReader(EntityManagerFactory entityManagerFactory) {
JpaCursorItemReader<MyEntity> reader = new JpaCursorItemReader<>();
reader.setQueryString("SELECT m FROM MyEntity m");
reader.setEntityManagerFactory(entityManagerFactory);
return reader;
}
Thread - safe
JpaCursorItemReader는 Thread Safe 하지 않다!
JpaCursorItemReader
는 데이터베이스 커서를 사용하여 데이터를 읽습니다.
데이터베이스 커서는 데이터베이스의 특정 위치를 가리키는 포인터로, 데이터를 순차적으로 한 줄 씩 읽을 수 있께 해줍니다.
이 방식은 메모리를 적게 사용하고 대량의 데이터를 효율적으로 처리할 수 있는 장점이 있지만, 여러 스레드가 동시에 같은 커서를 사용하려고 하면 문제가 발생할 수 있습니다.
예를 들어, 두 스레드가 동시에 같은 JpaCursorItemReader
인스턴스를 사용하여 데이터를 읽으려고 한다고 가정해보겠습니다.
스레드 A가 데이터의 한 행을 읽고 다음 행으로 이동하는 동안, 스레드 B도 동시에 데이터를 읽으려고 시도합니다.
이 경우, 스레드 B가 어느 위치의 데이터를 읽어야 할 지, 스레드 A가 이미 읽은 데이터를 다시 읽게 될지, 아니면 스레드 A가 다음으로 이동할 준비를 하고 있는 행을 읽게 될지 예측할 수 없습니다.
이로 인해 데이터 무결성 문제, 성능 저하, 예상치 못한 예외 발생 등이 일어날 수 있으며, 이러한 이유로 JpaCursorItemReader
는 thread-safe 하지 않습니다.
JpaPagingItemReader는 Thread Safe 하다!
JpaPagingItemReader
는 데이터를 페이징 단위로 처리합니다.
각 페이지는 데이터베이스로부터 독립적으로 조회되며, 한 번에 한 페이지의 데이터만 메모리에 적재되어 처리됩니다.
이 방식은 각 스레드가 서로 다른 페이지를 처리하도록 할 수 있어, 스레드 간의 충돌 없이 병렬 처리를 수행할 수 있습니다.
예를 들어, 스레드 A가 1페이지의 데이터를 처리하고 있는 동안, 스레드 B는 2페이지의 데이터를 독립적으로 처리할 수 있습니다.
각 스레드는 자신만의 데이터 세트를 가지고 작업하기 떄문에, 스레드간에 데이터 접근에 대한 충돌이 일어나지 않습니다.
이로 인해 JpaPagingItemReader
는 JpaCursorItemReader
에 비해 thread-safe한 처리 방식을 제공합니다.
그럼 JpaCursorItemReader에서는 thread-safe하게 하기 위해 어떻게 해야 되는데?
- Synchronized ItemReader 래퍼 구현
기존의JpaCursorItemReader
를 사용해야 한다면, thread-safe 하지 않은 문제를 해결하기 위해 동기화된 래퍼를 구현할 수 있습니다. 이 방법은JpaCursorItemReader
의 메소드 호출을synchronized
블록이나 메소드로 감싸서, 한 번에 하나의 스레드만 접근할 수 있도록 합니다. 이는 성능 저하를 일으킬 수 있지만, 병렬 처리 환경에서 데이터 무결성을 보장할 수 있습니다.
public class SynchronizedItemReader<T> implements ItemReader<T> {
private final ItemReader<T> delegate;
public SynchronizedItemReader(ItemReader<T> delegate) {
this.delegate = delegate;
}
@Override
public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return delegate.read();
}
}
@Bean
public SynchronizedItemStreamReader<NotificationEntity> sendNotificationItemReader() {
JpaCursorItemReader<NotificationEntity> itemReader = new JpaCursorItemReaderBuilder<NotificationEntity>()
.name("sendNotificationItemReader")
.entityManagerFactory(entityManagerFactory)
// 이벤트(event)가 수업 전이며, 발송 여부(sent)가 미발송인 알람이 조회 대상이 됩니다.
.queryString("select n from NotificationEntity n where n.event = :event and n.sent = :sent")
.parameterValues(Map.of("event", NotificationEvent.BEFORE_CLASS, "sent", false))
.build();
return new SynchronizedItemStreamReaderBuilder<NotificationEntity>()
.delegate(itemReader)
.build();
}
- Partitioning 사용
Partitioning은 작업을 독립적인 여러 파티션으로 나누어 각각을 별도의 스레드에서 처리하게 하는 방법입니다.
이 접근법은 대량의 데이터를 효율적으로 병렬 처리할 수 있게 해줍니다.
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.Map;
public class RangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
for (int i = 1; i <= gridSize; i++) {
ExecutionContext executionContext = new ExecutionContext();
// 예시: 각 파티션에 대한 구성 정보 설정
executionContext.putInt("minId", i * 1000);
executionContext.putInt("maxId", (i + 1) * 1000 - 1);
result.put("partition" + i, executionContext);
}
return result;
}
}
다음으로, 각 파티션을 처리할 스텝을 정의합니다.
이 스텝에서는 JpaCursorItemReader
또는 다른 ItemReader
를 사용할 수 있습니다.
각 파티션은 별도의 ExecutionContext
를 가지므로, 이 컨텍스트에 따라 데이터를 읽어 처리할 수 있습니다.
마지막으로, 파티셔닝을 사용하는 메인 스텝을 정의합니다.
이 스텝은 파티셔너와 개별 파티션을 처리할 스텝을 사용하여 구성됩니다.
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.builder.PartitionStepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BatchPartitioningConfig {
private final StepBuilderFactory stepBuilderFactory;
public BatchPartitioningConfig(StepBuilderFactory stepBuilderFactory) {
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Step partitionedStep() {
Partitioner partitioner = new RangePartitioner();
PartitionStepBuilder partitionStepBuilder = stepBuilderFactory.get("partitionedStep")
.partitioner("slaveStep", partitioner);
// 여기서 slaveStep은 각 파티션을 처리하는 스텝을 정의하는 메소드입니다.
// partitionStepBuilder.step(slaveStep()).gridSize(10).taskExecutor(taskExecutor()).build();
return partitionStepBuilder
.gridSize(10) // 파티션의 수
// .taskExecutor(taskExecutor()) // 병렬 실행을 위한 TaskExecutor 설정
.build();
}
// 여기에 taskExecutor와 slaveStep의 구현을 추가합니다.
}