사실, 간단히 검색하면 비동기 동작을 위한 자료형이 뭔지 알 수 있는데 기억 하는 차원에서 작성해 둔다.
요약을 하자면,
- 스레드풀 개수 정의
- 비동기 작업 정의 및 실행
- 스레드풀 반영
- 비동기 작업 완료시 체이닝을 통한 후속작업 연계
- 예외처리 포함
- 다수의 작업을 묶어서 한번에 체이닝을 연계할 수 있다.
- thenAccept 를 통해 바로 결과값 처리를 할 수 있다.
- 비동기 작업 결과값 확인을 위해 대기 (블로킹)
- thenAccept를 사용하지 않은 경우
리턴이 없는 단순 비동기 작업 정의 및 실행 예
...
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
});
- ComputableFuture <리턴 자료형> 으로 정의 한다.
- runAsync 는 비동기 작업을 실행한다.
리턴이 있는 비동기 정의 및 실행 예
...
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> "hello");
...
- supplyAsync 를 이용해 리턴이 있는 비동기 작업을 실행한다.
하나의 비동기 작업을 실행하고, 결과값을 통해 체이닝을 연결 할 수 있다.
...
CompletableFuture
.supplyAsync(() -> 10)
.thenApply(n -> n * 2)
.handle((result, ex) -> {
if (ex != null) {
System.out.println("예외 처리: " + ex.getMessage());
return -1; // fallback
}
return result;
})
.thenAccept(System.out::println);
...
- thenApply : 비동기 작업의 결과값을 가지고 후속 작업 시행
- 다수의 비동기 작업의 결과를 thenApply로 처리 할 수 있다. (아래에 예 추가)
- thenCompose : thenApply 대신 사용하는 것으로, 하나의 비동기 작업 후, 또다른 비동기 작업을 호출할 때 사용한다.
- thenAccept : apply 작업 후 최종 결과물을 통해 작업 시행. 이 경우 결과값 리턴이 없기 떄문에 join으로 반환값을 가져올 수 없다
- handle : 성공/실패 모두 콜백된다. 즉, 예외가 발생하였을 때에도, 작업 성공했을 때에도 이곳에 진입된다.
비동기 작업 후 결과를 통해 다른 비동기 작업 수행
import java.util.concurrent.CompletableFuture;
public class ThenComposeExample {
public static void main(String[] args) {
CompletableFuture<String> result =
CompletableFuture
.supplyAsync(() -> "userId")
.thenCompose(id -> findUserAsync(id));
String user = result.join();
System.out.println(user);
}
static CompletableFuture<String> findUserAsync(String id) {
return CompletableFuture.supplyAsync(() -> "User(" + id + ")");
}
}
- thenCompose 를 통해 다른 비동기 작업을 수행한다.
정의된 다수의 비동기 결과값을 처리하는 예.
- 단건의 비동기 동작에 사용한 체이닝을 이곳에서 사용 할 수 있다.
CompletableFuture.allOf(...)
CompletableFuture.anyOf(...)
실제 예
CompletableFuture.allOf(f1, f2)
.thenApply(v -> {
String r1 = f1.join();
String r2 = f2.join();
return r1 + r2;
});
- allOf : 모든 작업이 완료 되었을때 후속 처리 진행
- anyOf : 모든 작업 중 하나라도 완료 되면, 후속 처리 진행
비동기 실행 완료 후 결과를 꺼내는 메소드
CompletableFuture.allOf(task1, ....).join();
CompletableFuture.allOf(task1, ....).get();
이후, task1.join(), task2.join() 으로 각각 결과를 받아야와햠
- 둘 다 실행 결과를 받으며, 동기 작업 (블로킹) 이다. 즉, 비동기 작업을 실행하고 완료될 때까지 여기서 대기하게 된다.
- get 의 경우, 개발자에게 예외처리가 강제 되고, (try-catch 사용) join의 경우 그렇지 않다.
- 보통 join을 많이 사용한다.
- thenApply() 를 앞에서 호출했다면, 값을 가져올 수 없다.
스레드풀 생성 후 지정
ExecutorService executor =
Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> work(), executor);
- 스레드풀 개수를 정의하고, 이 스레드풀 내에서 비동기 작업을 수행하도록 한다.
전반적인 예는 다음과 같다.
import java.util.concurrent.*;
public class CompleteExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 비동기 작업 정의 + 즉시 실행
CompletableFuture<Integer> f1 =
CompletableFuture.supplyAsync(() -> {
sleep(500);
return 10;
}, executor);
CompletableFuture<Integer> f2 =
CompletableFuture.supplyAsync(() -> {
sleep(300);
return 20;
}, executor);
// 완료 조건 결합 + 완료 후 처리 정의
CompletableFuture<Integer> combined =
CompletableFuture
.allOf(f1, f2)
.thenApply(v -> {
// 이 시점에서는 f1, f2 모두 완료됨
return f1.join() + f2.join();
});
// 결과 대기 (동기화 지점)
Integer result = combined.join();
System.out.println("결과: " + result);
executor.shutdown();
}
static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
동시성 문제 관련
여러 스레드에서 동시에 데이터 접근 및 변경 하게되면 동시성 문제를 겪게 된다. 해결책은 여러개가 있다.
Synchronized
- 보통 비동기 작업을 할때에 동시성 문제가 생길 수 있고, 이를 위해서 성능에서 조금 손해 보더라도 lock을 걸어서 문제를 해결한다.
- Atomic 자료형을 사용하는것도 방법이 되겠고, Synchronized 를 이용할 수도 있다.
- 메서드 안에서 동시성 문제가 발생할것 같은 구간에만 Syncronized 를 이용하는것이 좋다.
public void methodA()
{
....
synchronized(this)
{
//동시성 문제가 발생할 코드만 여기에 추가
}
...
}
ReadWriteLock
- 읽기가 많을때 성능상 유리하다.
- 읽기는 동시에 읽을 수 있으나, 쓸때에는 락을 건다.
ReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
try {
// 읽기
} finally {
lock.readLock().unlock();
}
lock.writeLock().lock();
try {
// 쓰기
} finally {
lock.writeLock().unlock();
}
Atomic 자료형
- 해당 객체 한정으로 동시성 문제를 해결 할 수 있다.
| AtomicInteger | int 원자 연산 |
| AtomicLong | long |
| AtomicBoolean | flag |
| AtomicReference<T> | 객체 참조 |
...
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();
...
AtomicReference 의 예를 보면 다음과 같다.
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceExample {
public static void main(String[] args) throws InterruptedException {
AtomicReference<Counter> ref =
new AtomicReference<>(new Counter(0));
Runnable task = () -> {
for (int i = 0; i < 1_000; i++) {
ref.updateAndGet(old ->
new Counter(old.value + 1)
);
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(ref.get().value); // 2000
}
static class Counter {
final int value;
Counter(int value) {
this.value = value;
}
}
}
Concurrent컬랙션
| ConcurrentHashMap | 동시 접근 안전 |
| CopyOnWriteArrayList | 읽기 많을 때 |
| ConcurrentLinkedQueue | lock-free 큐 |
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map =
new ConcurrentHashMap<>();
map.put("a", 1);
map.compute("a", (k, v) -> v + 1);
System.out.println(map.get("a")); // 2
}
}
- map.put 은 동시성을 예방하지 못한다. 따라서 단순 추가에서만 사용한다.
- map.compute 는 기존값에 대한 연산이 필요할때, 그리고 동시성을 방지하고자 할때 사용한다.
'Java' 카테고리의 다른 글
| Atomic 자료형 (0) | 2025.12.12 |
|---|---|
| Optional.ofNullable 로 null-safe 처리 하기 (0) | 2025.11.04 |
| 파일 쓰고, 읽기 예제 (0) | 2025.10.02 |
| Checked, Unchecked Exception (0) | 2025.10.01 |
| Stream API (0) | 2025.09.10 |