💻 백엔드

🚀 SELECT FOR UPDATE SKIP LOCKED로 효율적인 작업 큐 구현하기

twoweekhee 2025. 6. 9. 10:17

안녕하세요! 오늘은 데이터베이스의 고급 잠금 기능인 SELECT FOR UPDATE SKIP LOCKED에 대해 알아보겠습니다. 특히 Spring Boot 환경에서 어떻게 활용할 수 있는지 실제 코드와 함께 살펴보겠어요! 💪

📋 목차

  1. SELECT FOR UPDATE SKIP LOCKED란?
  2. 기본 동작 원리
  3. 주요 사용 사례
  4. Spring Boot에서 구현하기
  5. 실전 예제: 작업 큐 처리
  6. 성능 최적화 팁
  7. 주의사항

🤔 SELECT FOR UPDATE SKIP LOCKED란?

SELECT FOR UPDATE SKIP LOCKED는 PostgreSQL과 MySQL에서 제공하는 강력한 잠금 기능입니다.

일반적인 SELECT FOR UPDATE는 조회한 행들에 배타적 잠금을 걸어서 다른 트랜잭션이 해당 행을 수정하지 못하게 합니다. 하지만 여기에 SKIP LOCKED를 추가하면 마법 같은 일이 일어나죠! ✨

💡 핵심 아이디어: 이미 잠긴 행들은 건너뛰고, 사용 가능한 행들만 즉시 가져온다!

⚙️ 기본 동작 원리

기존 방식과 비교해보면 차이점이 명확해집니다:

일반적인 SELECT FOR UPDATE

-- 잠긴 행이 있으면 해제될 때까지 대기 😴
SELECT * FROM job_queue WHERE status = 'pending' LIMIT 1 FOR UPDATE;

SKIP LOCKED 추가

-- 잠긴 행은 건너뛰고 사용 가능한 행만 즉시 반환 🏃‍♂️
SELECT * FROM job_queue WHERE status = 'pending' LIMIT 1 FOR UPDATE SKIP LOCKED;

이렇게 하면 여러 워커가 동시에 실행되어도 서로 다른 작업을 처리할 수 있어요!

📚 주요 사용 사례

1. 작업 큐 처리

-- 처리 대기 중인 작업을 안전하게 가져오기
SELECT * FROM job_queue 
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED;

2. 배치 처리

-- 여러 워커가 동시에 다른 주문 처리
SELECT * FROM orders 
WHERE status = 'new'
LIMIT 100
FOR UPDATE SKIP LOCKED;

🔧 Spring Boot에서 구현하기

자, 이제 실제로 Spring Boot에서 어떻게 구현하는지 살펴볼까요?

JPA Native Query 방식

Java 버전:

@Repository
public interface JobRepository extends JpaRepository<Job, Long> {
    
    @Query(value = "SELECT * FROM jobs WHERE status = 'PENDING' " +
                   "ORDER BY created_at LIMIT :limit FOR UPDATE SKIP LOCKED", 
           nativeQuery = true)
    List<Job> findPendingJobsForUpdate(@Param("limit") int limit);
}

@Service
@Transactional
public class JobProcessingService {
    
    @Autowired
    private JobRepository jobRepository;
    
    public List<Job> claimJobs(int batchSize) {
        List<Job> jobs = jobRepository.findPendingJobsForUpdate(batchSize);
        
        // 상태 변경으로 중복 처리 방지
        jobs.forEach(job -> job.setStatus(JobStatus.PROCESSING));
        jobRepository.saveAll(jobs);
        
        return jobs;
    }
}

Kotlin 버전:

@Repository
interface JobRepository : JpaRepository<Job, Long> {
    
    @Query(
        value = """
            SELECT * FROM jobs 
            WHERE status = 'PENDING' 
            ORDER BY created_at 
            LIMIT :limit 
            FOR UPDATE SKIP LOCKED
        """, 
        nativeQuery = true
    )
    fun findPendingJobsForUpdate(@Param("limit") limit: Int): List<Job>
}

@Service
@Transactional
class JobProcessingService(
    private val jobRepository: JobRepository
) {
    
    fun claimJobs(batchSize: Int): List<Job> {
        val jobs = jobRepository.findPendingJobsForUpdate(batchSize)
        
        jobs.forEach { it.status = JobStatus.PROCESSING }
        jobRepository.saveAll(jobs)
        
        return jobs
    }
}

JDBC Template 방식

더 세밀한 제어가 필요하다면 JDBC Template을 사용할 수도 있어요:

@Repository
class JobRepositoryCustom(
    private val jdbcTemplate: JdbcTemplate
) {
    
    @Transactional
    fun claimPendingJobs(limit: Int): List<Job> {
        val sql = """
            SELECT id, title, status, created_at 
            FROM jobs 
            WHERE status = 'PENDING' 
            ORDER BY created_at 
            LIMIT ? 
            FOR UPDATE SKIP LOCKED
        """
        
        return jdbcTemplate.query(sql, arrayOf(limit)) { rs, _ ->
            Job(
                id = rs.getLong("id"),
                title = rs.getString("title"),
                status = JobStatus.valueOf(rs.getString("status")),
                createdAt = rs.getTimestamp("created_at").toLocalDateTime()
            )
        }
    }
}

🛠️ 실전 예제: 작업 큐 처리

실제 운영 환경에서 사용할 수 있는 완전한 예제를 만들어보겠습니다:

엔티티 정의

@Entity
data class Job(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,
    
    val title: String,
    val payload: String,
    
    @Enumerated(EnumType.STRING)
    var status: JobStatus,
    
    val createdAt: LocalDateTime = LocalDateTime.now(),
    var processedAt: LocalDateTime? = null
)

enum class JobStatus {
    PENDING, PROCESSING, COMPLETED, FAILED
}

워커 구현

@Component
class JobWorker(
    private val jobProcessingService: JobProcessingService
) {
    private val logger = LoggerFactory.getLogger(JobWorker::class.java)
    
    @Scheduled(fixedDelay = 5000) // 5초마다 실행
    fun processJobs() {
        val jobs = jobProcessingService.claimJobs(10) // 10개씩 처리
        
        if (jobs.isNotEmpty()) {
            logger.info("🎯 ${jobs.size}개의 작업을 처리합니다.")
            
            jobs.forEach { job ->
                GlobalScope.launch {
                    processJob(job)
                }
            }
        }
    }
    
    private suspend fun processJob(job: Job) {
        try {
            logger.info("📋 작업 처리 중: ${job.title}")
            
            // 실제 업무 로직 (예: API 호출, 파일 처리 등)
            delay(1000) 
            
            jobProcessingService.completeJob(job.id)
            logger.info("✅ 작업 완료: ${job.title}")
            
        } catch (e: Exception) {
            logger.error("❌ 작업 실패: ${job.title}", e)
            jobProcessingService.failJob(job.id, e.message ?: "Unknown error")
        }
    }
}

🚀 성능 최적화 팁

1. 적절한 트랜잭션 격리 수준 설정

@Transactional(isolation = Isolation.READ_COMMITTED)
public List<Job> claimJobs(int batchSize) {
    // READ_COMMITTED에서 가장 효과적으로 동작
    return jobRepository.findPendingJobsForUpdate(batchSize);
}

2. 커넥션 풀 최적화

# application.yml
spring:
  datasource:
    hikari:
      maximum-pool-size: 20
      minimum-idle: 5
      connection-timeout: 30000
  jpa:
    properties:
      hibernate:
        jdbc:
          batch_size: 20
        order_inserts: true
        order_updates: true

3. 인덱스 최적화

-- 효율적인 조회를 위한 복합 인덱스
CREATE INDEX idx_jobs_status_created 
ON jobs(status, created_at) 
WHERE status = 'PENDING';

⚠️ 주의사항

1. 결과 집합 크기 예측 불가

SKIP LOCKED를 사용하면 예상보다 적은 수의 행이 반환될 수 있어요. 이는 다른 프로세스에서 이미 잠금을 획득했기 때문입니다.

2. 완전성 보장의 어려움

일부 행이 건너뛰어질 수 있어서, 모든 데이터를 빠짐없이 처리해야 하는 경우에는 추가적인 로직이 필요합니다.

3. 데이터베이스 지원 확인

PostgreSQL 9.5+, MySQL 8.0+에서만 지원되니까 버전을 꼭 확인하세요!

🎯 마무리

SELECT FOR UPDATE SKIP LOCKED는 현대적인 분산 시스템에서 매우 유용한 기능입니다. 특히 마이크로서비스 아키텍처에서 여러 인스턴스가 동시에 작업을 처리할 때 그 진가를 발휘하죠!

적절히 활용하면 동시성을 크게 향상시키고 데드락을 방지할 수 있어서, 시스템의 전반적인 성능과 안정성을 높일 수 있습니다. 🎉

여러분도 한번 시도해보시고, 궁금한 점이 있다면 언제든 댓글로 남겨주세요!


🏷️ 태그

#PostgreSQL #MySQL #SpringBoot #JPA #Kotlin #Java #Database #Concurrency #JobQueue #SkipLocked #SelectForUpdate #분산처리 #동시성 #성능최적화 #백엔드개발