2024년 7월 29일 작성
Reactive Programming - 비동기적으로 Data Stream을 다루는 방법
Reactive Programming은 data stream과 변경 사항 전파를 중심으로하는 비동기 programming paradigm입니다.
Reactive Programming : Data 변화에 반응하여 비동기적으로 처리하기
- Reactive Programming은 data의 흐름과 변화에 대응하여 비동기적으로 data를 처리하는 programming paradigm입니다.
- event 기반 system을 구축하는 데 유용하며, 특히 사용자 interface(UI)와 같은 실시간 data stream을 처리하는데 적합합니다.
- 주요 framework 및 library로 Rx(ReactiveX), Project Reactor, Akka Streams 등이 있습니다.
Reactive System의 특징
- 비동기성 (Asynchrony) : reactive system은 event 또는 data stream을 비동기적으로 처리합니다.
- 이를 통해 다른 작업을 동시에 수행하거나 blocking을 피할 수 있습니다.
- 반응성 (Responsiveness) : reactive system은 실시간으로 data의 변화에 반응합니다.
- 사용자 요청이나 외부 event에 빠르게 응답할 수 있습니다.
- 탄력성 (Elasticity) : reactive system은 부하나 실패에 유연하게 대응할 수 있습니다.
- system의 자원을 동적으로 조절하여 확장성과 가용성을 제공합니다.
- Message 기반 (Messaging) : reactive system은 message 기반 architecture를 기반으로 동작합니다.
- component 간에 비동기적으로 message를 교환하여 상호작용합니다.
Reactive Programming의 주요 개념
- Data Stream : 시간에 따라 변하는 값의 연속적인 흐름을 나타냅니다.
- 예를 들어, mouse click event, sensor data, web socket message 등이 data stream이 될 수 있습니다.
- Observer Pattern : Subject(주체)와 Observer(관찰자)의 관계를 정의합니다.
- Subject는 data의 변화를 감지하고, Observer는 그 변화를 받아 처리합니다.
- Reactive Programming에서는 주로 Observable(주체)과 Observer(관찰자)의 형태로 구현됩니다.
- 연산자 : 연산자(operator)는 data stream을 변환하고 조작하는 함수들입니다.
map
,filter
,reduce
등의 연산자를 통해 stream의 data를 원하는 형태로 가공할 수 있습니다.
- 비동기 처리 : Reactive Programming은 비동기 event를 처리하는 데 중점을 둡니다.
- callback이나 promise를 사용하는 대신, stream을 통해 data의 흐름을 관리합니다.
Reactive Programming의 장점
- 성능 및 확장성 : 비동기 event 처리를 통해 system의 확장성과 성능을 높일 수 있습니다.
- 다수의 event를 동시에 처리할 수 있으며, 필요에 따라 system을 병렬로 확장할 수 있습니다.
- 응답성 : data의 변화에 실시간으로 반응하여 빠른 응답 시간을 제공합니다.
- 사용자 경험을 향상시키는 데 도움이 됩니다.
- 유지 보수성 : data 흐름과 처리가 명확하게 정의됩니다.
- code의 가독성과 유지 보수성이 향상됩니다.
- 장애 처리와 회복력 : 오류를 격리시키고, 다른 component에 영향을 주지 않으면서 정상 동작을 유지할 수 있습니다.
- 장애가 발생하더라도 탄력적으로 대응할 수 있습니다.
Rx : ReactiveX : Reactive eXtentions
- Rx(ReactiveX)는 Reactive Programming의 원칙과 pattern을 구현한 library 집합입니다.
- 비동기적인 event 기반 programming을 위한 pattern과 도구를 제공하며, data stream과 event를 효과적으로 처리할 수 있도록 도와줍니다.
- 복잡한 비동기 작업을 단순화하고, system의 응답성과 확장성을 향상시킬 수 있습니다.
- Rx는 다양한 언어와 platform에서 범용적으로 사용할 수 있도록 여러 구현체가 있으며, 대표적으로 RxJava, RxJS, RxPY 등이 있습니다.
- 이 구현체를 사용함으로써 Reactive Programming을 쉽게 구현할 수 있습니다.
Rx의 주요 구성
- Observable (or Flowable) : data stream을 나타내며, 이 stream은 시간이 지남에 따라 data를 발행합니다.
- data는 사용자 입력, network 응답, file 읽기 등의 다양한 source에서 생성될 수 있습니다.
- Observer (or Subscriber) : Observable을 구독하여 data stream의 변화에 반응합니다.
- Observable은 data를 발행할 때마다 Observer의
onNext
,onError
,onComplete
method를 호출합니다.
- Observable은 data를 발행할 때마다 Observer의
- Operator : data stream을 변환하거나 조작하는 함수들입니다.
- 예를 들어,
map
,filter
,reduce
,merge
등의 연산자를 사용하여 stream의 data를 원하는 형태로 가공할 수 있습니다.
- 예를 들어,
- Scheduler : Rx는 Scheduler를 통해 비동기 작업을 관리하고, thread pool을 활용하여 병렬 처리를 지원합니다.
- Scheduler로 비동기 작업을 실행할 thread 또는 thread pool을 지정합니다.
- scheduling 기능을 사용하여 비동기 작업의 실행 context를 제어할 수 있습니다.
- 예를 들어, I/O 작업, network 호출, database query 등과 같은 비동기 작업을 효과적으로 처리할 수 있습니다.
- Scheduler를 사용하여 작업을 지연시키거나 주기적으로 반복 실행할 수도 있습니다.
Rx의 장점
- 유연성 : 다양한 source의 data를 stream으로 처리할 수 있습니다.
- 가독성 : 비동기 code를 동기 code처럼 작성할 수 있어 가독성이 높아집니다.
- 유지 보수성 : data 흐름과 처리가 명확하게 정의되어 있어 유지 보수성이 향상됩니다.
- 오류 처리 : stream 내에서 발생하는 오류를 일관되게 처리할 수 있습니다.
- 범용성 : 다양한 언어와 platform에 대한 구현체를 지원하여, 여러 환경에서 같은 spec으로 기능을 사용할 수 있습니다.
RxJava 사용 예시
import io.reactivex.Observable;
public class RxJavaExample {
public static void main(String[] args) {
// Observable 생성
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});
// Observer 생성 및 구독
observable.subscribe(
item -> System.out.println("Next : " + item), // onNext
error -> System.err.println("Error : " + error), // onError
() -> System.out.println("Completed") // onComplete
);
}
}
RxJS 사용 예시
const { Observable } = rxjs;
// Observable 생성
const observable = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.next('RxJS');
subscriber.complete();
});
// Observer 생성 및 구독
observable.subscribe({
next(x) { console.log('Next : ' + x); }, // onNext
error(err) { console.log('Error : ' + err); }, // onError
complete() { console.log('Completed'); } // onComplete
});
RxPY 사용 예시
import rx
from rx import operators as ops
# Observable 생성
observable = rx.create(lambda observer, scheduler: [
observer.on_next("Hello"),
observer.on_next("RxPY"),
observer.on_completed()
])
# Observer 생성 및 구독
observable.subscribe(
on_next=lambda item: print(f"Next : {item}"), # onNext
on_error=lambda error: print(f"Error : {error}"), # onError
on_completed=lambda: print("Completed") # onComplete
)
RxJava : Java에서 Reactive Programming 구현하기
- RxJava는 Rx의 Java 구현체입니다.
- Rx의 개념을 Java에 적용하여, 선언적이고 병렬 처리가 가능한 Reactive Programming을 구현할 수 있도록 합니다.
RxJava 핵심 개념
-
Observable : data stream을 생성하고 발행하는 역할을 합니다.
-
Observer : Observable을 구독하여 발행된 data를 처리합니다.
- Operators : data stream을 변환하고 조작하는 함수들입니다.
- 예를 들어,
map
,filter
,flatMap
등이 있습니다.
- 예를 들어,
- Schedulers : 비동기 작업의 실행 context를 정의합니다.
- 예를 들어, IO 작업을 위한
Schedulers.io()
, UI thread를 위한AndroidSchedulers.mainThread()
등이 있습니다.
- 예를 들어, IO 작업을 위한
예제 1 : RxJava의 기본적인 사용법
- “Hello”와 “RxJava” 문자열을 발행하는
Observable
을 생성하고, 이를 구독하여 data를 처리하는Observer
를 정의합니다.- 구독이 시작되면
Observer
는onSubscribe
,onNext
,onComplete
method를 통해 data의 흐름(data stream)을 처리합니다.
- 구독이 시작되면
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class RxJavaBasicExample {
public static void main(String[] args) {
// Observable 생성
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onComplete();
});
// Observer 생성
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(String s) {
System.out.println("Next : " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("Error : " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
// Observable 구독
observable.subscribe(observer);
}
}
예제 2 : 연산자 사용
-
연산자를 사용하여 data stream 변환하고 조작할 수 있습니다.
-
연산자를 활용하여 data stream을 filtering하고 변환하는 예제입니다.
- 숫자 배열을
Observable
로 변환한 후, 짝수만 filtering하고 각 값을 2배로 변환하는 연산자를 적용합니다.- 결과는 비동기로 처리되어 출력됩니다.
Schedulers.io()
와Schedulers.single()
을 사용하여 작업을 background thread와 단일 thread에서 처리합니다.
- 숫자 배열을
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.observers.DisposableObserver;
public class RxJavaOperatorExample {
public static void main(String[] args) {
Observable<Integer> observable = Observable.fromArray(1, 2, 3, 4, 5)
.filter(num -> num % 2 == 0) // 짝수 filtering
.map(num -> num * 2) // 각 값을 2배로 변환
.subscribeOn(Schedulers.io()) // IO scheduler에서 실행
.observeOn(Schedulers.single()); // 단일 thread에서 관찰
observable.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("Next : " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("Error : " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// 잠시 대기하여 비동기 작업이 완료되도록 함
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
예제 3 : Network 요청 처리
-
RxJava는 network 요청과 같은 비동기 작업을 처리하는 데 매우 유용합니다.
-
RxJava를 사용하여 network 요청을 처리하는 예제입니다.
OkHttpClient
를 사용하여 GitHub API에 network 요청을 보내고, 그 결과를Observable
을 통해 처리합니다.Schedulers.io()
를 사용하여 network 요청을 background thread에서 처리합니다.AndroidSchedulers.mainThread()
를 사용하여 결과를 main thread에서 처리합니다.
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.observers.DisposableObserver;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
public class RxJavaNetworkExample {
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient();
Observable<String> observable = Observable.create(emitter -> {
try {
Request request = new Request.Builder()
.url("https://api.github.com")
.build();
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
emitter.onNext(response.body().string());
emitter.onComplete();
} else {
emitter.onError(new Exception("Network request failed"));
}
} catch (Exception e) {
emitter.onError(e);
}
});
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String response) {
System.out.println("Response : " + response);
}
@Override
public void onError(Throwable e) {
System.out.println("Error : " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// 잠시 대기하여 비동기 작업이 완료되도록 함
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
예제 4 : 실시간 주식 시장 Data 추적
- 실시간 주식 시장 data를 가져와 특정 주식의 가격 변화를 실시간으로 추적하고 사용자에게 update된 정보를 제공하는 web application 예제입니다.
- 실시간 data stream을 효율적으로 처리하고, data의 변화에 실시간으로 반응하는 reactive한 application입니다.
- Observable 생성 :
StockPriceFetcher
class에서Observable
을 생성합니다.create
method를 사용하여 새로운Observable
을 생성하고,emitter
를 통해 event를 발생시킵니다.- 여기서는
Timer
를 사용하여 주기적으로 주식 가격을 가져온 후,onNext
를 호출하여 stream에 event를 발생시킵니다.
- 주식 가격 Data 가져오기 : 주식 가격 data르 가져오는 method를 작성합니다.
fetchStockPrice
method는 실제로는 외부 API를 호출하여 주식 가격 data를 가져와야 합니다.- 여기서는 예시로 random 값을 사용합니다.
- Stream 구독 및 UI Update :
StockPriceUI
class에서Observable
을 구독합니다.subscribe
method를 사용하여 새로운 가격 data가 도착할 때마다 호출될 callback 함수를 전달합니다.- 이 함수에서는
updateStockPriceUI
method를 호출하여 가격 data를 UI에 update합니다.
1. Project 설정
- Maven 또는 Gradle을 사용하여 project에 RxJava library를 추가합니다.
<!-- Maven -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.0</version>
</dependency>
// Gradle
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
2. 주식 가격 Data를 가져오는 비동기 작업
- 주식 가격 data를 stream으로 가져오는 비동기 작업을 생성합니다.
- 이 작업은 일정 간격으로 주식 가격을 가져오고, stream에 event를 발생시킵니다.
import io.reactivex.rxjava3.core.Observable;
import java.util.Timer;
import java.util.TimerTask;
public class StockPriceFetcher {
public static Observable<Double> getStockPriceStream() {
return Observable.create(emitter -> {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 주식 가격을 가져오는 비동기 작업
double stockPrice = fetchStockPrice();
emitter.onNext(stockPrice);
} catch (Exception e) {
emitter.onError(e);
}
}
}, 0, 1000); // 1초마다 주식 가격 update
});
}
private static double fetchStockPrice() {
// 여기서는 예시로 random 값을 가져오지만, 실제로는 외부 API를 호출하는 등의 작업이 필요함
return 100 + Math.random() * 10;
}
}
3. 주식 가격 변화를 실시간으로 추적하고 UI Update
- 주식 가격 stream을 구독하여 새로운 가격 data가 도착할 때마다 callback 함수를 호출합니다.
- 이 함수에서 가격 data를 UI에 update할 수 있습니다.
public class StockPriceUI {
public static void main(String[] args) {
Observable<Double> stockPriceStream = StockPriceFetcher.getStockPriceStream();
stockPriceStream.subscribe(
price -> updateStockPriceUI(price),
error -> System.err.println("Error : " + error),
() -> System.out.println("Stock price stream completed")
);
// 계속 실행되도록 main thread 대기시키기
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void updateStockPriceUI(Double price) {
// 실제로는 UI를 update하는 code가 필요함
System.out.println("Updated Stock Price : " + price);
}
}