CompletableFuture로 블록체인 RPC 호출 5개를 동시에
CompletableFuture로 블록체인 RPC 호출 5개를 동시에
→ CompletableFuture는 Java에서 비동기 작업을 처리하기 위한 클래스다. 여러 작업을 동시에 시작하고, 각각의 결과를 나중에 합칠 수 있다.
2026년 2월 9일 — TX 응답속도 개선 및 비동기 병렬 처리 최적화 2026년 2월 10일 — 사전검증 RPC 병렬 조회 추가 적용
문제: 검증만 하는데 1초가 걸린다
토큰을 전송하기 전에 5가지를 확인해야 한다.
- 컨트랙트가 일시정지 상태인가?
- 보내는 계정이 동결됐는가?
- 받는 계정이 동결됐는가?
- 잔액이 충분한가?
- 현재 블록 타임스탬프는?
각각이 블록체인 RPC 호출이고, 한 번에 100200ms 걸린다.
→ RPC(Remote Procedure Call)란 네트워크를 통해 원격 서버의 함수를 호출하는 방식이다. 블록체인에서는 노드에 잔액 조회, 상태 확인 등을 요청할 때 쓴다. 순차로 5번이면 500ms1초. 전송 자체보다 검증이 더 오래 걸리는 상황이었다.
해결: 5개를 동시에 날리고, 모아서 판단
fun transferWithAuthorization(fromAddress: String, toAddress: String, amount: BigDecimal, ...) {
// 5개 검증을 동시에 시작
val pausedFuture = CompletableFuture.supplyAsync { isPaused() }
val frozenFromFuture = CompletableFuture.supplyAsync { isFrozen(fromAddress) }
val frozenToFuture = CompletableFuture.supplyAsync { isFrozen(toAddress) }
val balanceFuture = CompletableFuture.supplyAsync { balanceOf(fromAddress) }
val blockTimestampFuture = CompletableFuture.supplyAsync {
try {
web3j.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false)
.send().block.timestamp.toLong()
} catch (e: Exception) {
System.currentTimeMillis() / 1000 // 실패 시 서버 시간으로 대체
}
}
// 결과 수집 + 검증
if (joinUnwrap(pausedFuture)) throw IllegalStateException("컨트랙트 일시정지 상태")
if (joinUnwrap(frozenFromFuture)) throw IllegalStateException("송신자 계정 동결")
if (joinUnwrap(frozenToFuture)) throw IllegalStateException("수신자 계정 동결")
val balance = joinUnwrap(balanceFuture)
if (balance < amount) throw IllegalStateException("잔액 부족: $balance < $amount")
val blockTimestamp = joinUnwrap(blockTimestampFuture)
}
5개 호출이 동시에 나가고, 가장 느린 것 기준으로 ~200ms면 전부 끝난다. 순차 대비 약 2.5배 빨라졌다.
joinUnwrap: CompletionException 벗기기
CompletableFuture.join()은 내부 예외를 CompletionException으로 감싼다.
→ join()은 비동기 작업이 끝날 때까지 기다렸다가 결과를 반환하는 메서드다. 작업 중 에러가 나면 원래 예외를 CompletionException으로 한 번 더 감싸서 던지기 때문에, 원래 예외를 꺼내는 언래핑(unwrapping) 작업이 필요하다.
private fun <T> joinUnwrap(future: CompletableFuture<T>): T {
try {
return future.join()
} catch (e: CompletionException) {
throw e.cause ?: e // 원래 예외를 꺼내서 던짐
}
}
이게 없으면 "잔액 부족" 같은 비즈니스 예외가 CompletionException에 감싸져서, 에러 핸들러에서 제대로 분기되지 않는다.
배치 결제에서도 같은 패턴
여러 지갑의 잔액을 조회할 때, 같은 지갑이 여러 번 나올 수 있다 (같은 사용자의 다른 Leg). 중복 제거 후 병렬 조회:
val balanceMap = normalizedFromAddresses.distinct() // 중복 제거
.associateWith { address ->
CompletableFuture.supplyAsync { tokenContractService.balanceOf(address) }
}
.mapValues {
try { it.value.join() }
catch (e: CompletionException) { throw e.cause ?: e }
}
distinct()로 같은 주소는 한 번만 조회하고, 결과를 Map으로 저장해서 Leg별로 참조한다.
전용 스레드풀을 쓴 이유
@Bean(name = ["ioExecutor"])
fun ioExecutor(): ExecutorService {
return ThreadPoolExecutor(
4, 32, 60L, TimeUnit.SECONDS,
LinkedBlockingQueue(100),
{ r -> Thread(r, "io-executor").apply { isDaemon = true } },
ThreadPoolExecutor.CallerRunsPolicy()
)
}
CompletableFuture.supplyAsync { ... }는 기본적으로 ForkJoinPool.commonPool()을 쓴다.
→ ForkJoinPool.commonPool()은 JVM이 기본 제공하는 공용 스레드풀이다. CPU 코어 수에 맞춰 스레드가 제한되어 있어서, 계산 위주 작업에는 적합하지만 네트워크 대기가 긴 I/O 작업에는 적합하지 않다.
→ CPU 바운드는 연산이 주된 작업(예: 암호화), I/O 바운드는 네트워크/디스크 대기가 주된 작업(예: API 호출)을 뜻한다.
이건 CPU 바운드 작업용이라, I/O 대기(RPC 호출)로 스레드가 잠기면 다른 작업까지 영향을 받는다.
전용 ioExecutor를 만들어서:
- CPU 바운드와 I/O 바운드를 격리
- 최대 32개 스레드로 동시 RPC 호출 가능
- 큐가 가득 차면
CallerRunsPolicy로 호출자 스레드에서 직접 실행 (요청 거부 대신 느려지기) →CallerRunsPolicy는 스레드풀이 가득 찼을 때 작업을 거부하지 않고, 작업을 요청한 스레드가 직접 실행하게 하는 정책이다. 시스템이 멈추지 않고 속도만 느려진다.
blockTimestamp 조회 실패 시 fallback
val blockTimestampFuture = CompletableFuture.supplyAsync {
try {
web3j.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false)
.send().block.timestamp.toLong()
} catch (e: Exception) {
log.warn("블록 타임스탬프 조회 실패, 서버 시간 사용")
System.currentTimeMillis() / 1000
}
}
5개 중 4개가 성공했는데 타임스탬프 하나 때문에 전체가 실패하면 아까우니까, 서버 시간으로 대체한다. 어차피 validAfter/validBefore에 ±60초 여유를 두고 있어서 서버 시간으로도 충분하다.
배운 점
- 독립적인 I/O 호출이 여러 개면, 무조건 병렬화를 고려하자
CompletionException언래핑을 빼먹으면 에러 핸들링이 깨진다- 같은 데이터를 여러 번 조회하는 패턴이 보이면
distinct()+ Map 캐싱 - I/O 바운드 작업은 ForkJoinPool.commonPool() 말고 전용 스레드풀을 쓰자