Java

비동기 작업 및 동시성

Dean83 2025. 12. 31. 11:59

사실, 간단히 검색하면 비동기 동작을 위한 자료형이 뭔지 알 수 있는데 기억 하는 차원에서 작성해 둔다. 

요약을 하자면, 

  • 스레드풀 개수 정의
  • 비동기 작업 정의 및 실행
    • 스레드풀 반영 
    • 비동기 작업 완료시 체이닝을 통한 후속작업 연계
      • 예외처리 포함
    • 다수의 작업을 묶어서 한번에 체이닝을 연계할 수 있다.
      • thenAccept 를 통해 바로 결과값 처리를 할 수 있다.
  • 비동기 작업 결과값 확인을 위해 대기 (블로킹)
    • thenAccept를 사용하지 않은 경우

 

리턴이 없는 단순 비동기 작업 정의 및 실행 예

...

CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {

});
  • ComputableFuture <리턴 자료형> 으로 정의 한다.
  • runAsync 는 비동기 작업을 실행한다.

 

리턴이 있는 비동기 정의 및 실행 예

...

CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> "hello");
    
...
  • supplyAsync 를 이용해 리턴이 있는 비동기 작업을 실행한다.

 

 

하나의 비동기 작업을 실행하고, 결과값을 통해 체이닝을 연결 할 수 있다. 

...

CompletableFuture
    .supplyAsync(() -> 10)
    .thenApply(n -> n * 2)
    .handle((result, ex) -> {
        if (ex != null) {
            System.out.println("예외 처리: " + ex.getMessage());
            return -1; // fallback
        }
        return result;
    })
    .thenAccept(System.out::println);
    
...
  • thenApply : 비동기 작업의 결과값을 가지고 후속 작업 시행
    • 다수의 비동기 작업의 결과를 thenApply로 처리 할 수 있다. (아래에 예 추가) 
    • thenCompose : thenApply 대신 사용하는 것으로, 하나의 비동기 작업 후, 또다른 비동기 작업을 호출할 때 사용한다. 
  • thenAccept : apply 작업 후 최종 결과물을 통해 작업 시행. 이 경우 결과값 리턴이 없기 떄문에 join으로 반환값을 가져올 수 없다
  • handle : 성공/실패 모두 콜백된다. 즉, 예외가 발생하였을 때에도, 작업 성공했을 때에도 이곳에 진입된다.

 

비동기 작업 후 결과를 통해 다른 비동기 작업 수행

import java.util.concurrent.CompletableFuture;

public class ThenComposeExample {

    public static void main(String[] args) {

        CompletableFuture<String> result =
            CompletableFuture
                .supplyAsync(() -> "userId")
                .thenCompose(id -> findUserAsync(id));

        String user = result.join();
        System.out.println(user);
    }

    static CompletableFuture<String> findUserAsync(String id) {
        return CompletableFuture.supplyAsync(() -> "User(" + id + ")");
    }
}
  • thenCompose 를 통해 다른 비동기 작업을 수행한다.

 

정의된 다수의 비동기 결과값을 처리하는 예. 

  • 단건의 비동기 동작에 사용한 체이닝을 이곳에서 사용 할 수 있다. 
CompletableFuture.allOf(...)

CompletableFuture.anyOf(...)

실제 예
CompletableFuture.allOf(f1, f2)
    .thenApply(v -> {
        String r1 = f1.join();
        String r2 = f2.join();
        return r1 + r2;
    });
  • allOf : 모든 작업이 완료 되었을때 후속 처리 진행
  • anyOf : 모든 작업 중 하나라도 완료 되면, 후속 처리 진행

 

비동기 실행 완료 후 결과를 꺼내는 메소드

CompletableFuture.allOf(task1, ....).join();
CompletableFuture.allOf(task1, ....).get();

이후, task1.join(), task2.join() 으로 각각 결과를 받아야와햠
  • 둘 다 실행 결과를 받으며, 동기 작업 (블로킹) 이다.  즉, 비동기 작업을 실행하고 완료될 때까지 여기서 대기하게 된다.
  • get 의 경우, 개발자에게 예외처리가 강제 되고, (try-catch 사용) join의 경우 그렇지 않다. 
  • 보통 join을 많이 사용한다.
  • thenApply() 를 앞에서 호출했다면, 값을 가져올 수 없다.

 

스레드풀 생성 후 지정

ExecutorService executor =
    Executors.newFixedThreadPool(10);

CompletableFuture.supplyAsync(() -> work(), executor);
  • 스레드풀 개수를 정의하고, 이 스레드풀 내에서 비동기 작업을 수행하도록 한다.

 

전반적인 예는 다음과 같다. 

import java.util.concurrent.*;

public class CompleteExample {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 비동기 작업 정의 + 즉시 실행
        CompletableFuture<Integer> f1 =
            CompletableFuture.supplyAsync(() -> {
                sleep(500);
                return 10;
            }, executor);

        CompletableFuture<Integer> f2 =
            CompletableFuture.supplyAsync(() -> {
                sleep(300);
                return 20;
            }, executor);

        // 완료 조건 결합 + 완료 후 처리 정의
        CompletableFuture<Integer> combined =
            CompletableFuture
                .allOf(f1, f2)
                .thenApply(v -> {
                    // 이 시점에서는 f1, f2 모두 완료됨
                    return f1.join() + f2.join();
                });

        // 결과 대기 (동기화 지점)
        Integer result = combined.join();

        System.out.println("결과: " + result);

        executor.shutdown();
    }

    static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

 

 

동시성 문제 관련

여러 스레드에서 동시에 데이터 접근 및 변경 하게되면 동시성 문제를 겪게 된다. 해결책은 여러개가 있다.

 

Synchronized

  • 보통 비동기 작업을 할때에 동시성 문제가 생길 수 있고, 이를 위해서 성능에서 조금 손해 보더라도 lock을 걸어서  문제를 해결한다.
  • Atomic 자료형을 사용하는것도 방법이 되겠고, Synchronized 를 이용할 수도 있다. 
  • 메서드 안에서 동시성 문제가 발생할것 같은 구간에만 Syncronized 를 이용하는것이 좋다.
public void methodA()
{

	....
    synchronized(this)
    {
    	//동시성 문제가 발생할 코드만 여기에 추가
    }
    
    ...

}

 

 

ReadWriteLock

  • 읽기가 많을때 성능상 유리하다.
  • 읽기는 동시에 읽을 수 있으나, 쓸때에는 락을 건다.
ReadWriteLock lock = new ReentrantReadWriteLock();

lock.readLock().lock();
try {
    // 읽기
} finally {
    lock.readLock().unlock();
}

lock.writeLock().lock();
try {
    // 쓰기
} finally {
    lock.writeLock().unlock();
}

 

 

Atomic 자료형

  • 해당 객체 한정으로 동시성 문제를 해결 할 수 있다. 
AtomicInteger int 원자 연산
AtomicLong long
AtomicBoolean flag
AtomicReference<T> 객체 참조
...

AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();

...

 

AtomicReference 의 예를 보면 다음과 같다. 

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample {

    public static void main(String[] args) throws InterruptedException {

        AtomicReference<Counter> ref =
            new AtomicReference<>(new Counter(0));

        Runnable task = () -> {
            for (int i = 0; i < 1_000; i++) {
                ref.updateAndGet(old ->
                    new Counter(old.value + 1)
                );
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println(ref.get().value); // 2000
    }

    static class Counter {
        final int value;

        Counter(int value) {
            this.value = value;
        }
    }
}

 

Concurrent컬랙션

ConcurrentHashMap 동시 접근 안전
CopyOnWriteArrayList 읽기 많을 때
ConcurrentLinkedQueue lock-free 큐

 

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentMapExample {

    public static void main(String[] args) {

        ConcurrentHashMap<String, Integer> map =
            new ConcurrentHashMap<>();

        map.put("a", 1);

        map.compute("a", (k, v) -> v + 1);

        System.out.println(map.get("a")); // 2
    }
}
  • map.put 은 동시성을 예방하지 못한다. 따라서 단순 추가에서만 사용한다.
  • map.compute 는 기존값에 대한 연산이 필요할때, 그리고 동시성을 방지하고자 할때 사용한다.

'Java' 카테고리의 다른 글

Atomic 자료형  (0) 2025.12.12
Optional.ofNullable 로 null-safe 처리 하기  (0) 2025.11.04
파일 쓰고, 읽기 예제  (0) 2025.10.02
Checked, Unchecked Exception  (0) 2025.10.01
Stream API  (0) 2025.09.10