옵저버 패턴을 적용하여 구독자에게 변경사항을 알려주는 programming 기법.
하나의 값 보다는 data stream을 반환한다.
RxJava의 푸시 메커니즘과 풀 메커니즘 비교
RxKotlin은 프로그램에서 사용되는 반복자 패턴(Iterator)의 풀 메커니즘 대신 푸시 메커니즘의 데이터 / 이벤트 시스템으로 대표되는 옵저버블 패턴을 중심으로 작동한다. 여기서 Iterator 패턴은 아래와 같은 로직을 의미한다.
// iterator 방식
fun main(args: Array<String>){
var list: List<Any> = listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f)
var iterator = list.iterator()
while(iterator.hasNext()){
println(iterator.next())
}
}
알아두어야 할 점이 데이터가 사용 준비 될 때 까지 기존 스레드는 블로킹 상태가 된다. 즉 네트워크 호출이나 데이터베이스 쿼리를 이용해서 데이터를 가져온다면 스레드가 얼마나 블로킹 돼있을지 생각 해 보자. 위와 같은 방법은 그다지 효율적이지 않은 코드라고 할 수 있겠다.
observable 함수의 종류
onNext(item T)
: 값을 전달할 때 호출하여 값을 넘겨준다.
onError(e: Throwable)
: 에러가 발생할 경우에 호출.
onSubscribe(d: Disposable)
: 구독을 신청하면 호출해준다.
onComplete()
: 가지고 있는 값을 모두 전달하면 호출한다.
val observer: Observer<String> = object : Observer<String>{
override fun onComplete() { println("onComplete()") }
override fun onNext(item: T) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $e") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}
1 Observable.create을 이용하여 Observable을 생성하는 방법
fun main(args: Array<String>){
val observable1 = Observable.create{
it.onNext(1)
it.onNext(2)
it.onComplete()
}
val observable2 = Observable.create{
it.onNext(1)
it.onNext(2)
it.onError()
}
val observer: Observer<String> = object : Observer<String>{
override fun onComplete() { println("onComplete()") }
override fun onNext(item: T) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $e") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}
observable1.subscribe(observer)
observable2.subscribe(observer)
}
2 Observable.from~~~을 이용하여 Observable을 생성하는 방법
fun main(args: Array<String>){
val list = listOf(1, 2, 3)
val listOb = Observable.fromIterable(list)
val call = Callable<int> { 4 }
val callOb = Observable.fromCallable(call)
val future = object: Future<Int>{
override fun get() = 5
override fun get(timeout: Long, unit: Timeout) = 6
override fun isDone() = true
override fun isCancellable() = false
}
val futureOb = Observable.fromFuture(future)
val observer: Observer<Int> = object : Observer<Int> {
override fun onComplete() { println("onComplete()") }
override fun onNext(item: T) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $e") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}
listOb.subscribe(observer)
callOb.subscribe(observer)
futureOb.subscribe(observer)
}
from 함수의 다양한 형태
fromIterable()
: list처럼 iterable을 지원하는 instance를 Observable 형태로 변환해준다.
- 각 개별 아이템이 하나씩 전달 됨.
fromCallable()
: Callable 객체를 Observable 형태로 전달한다. - call() 함수의 return 값이 전달 된다.
fromFuture()
: Future 객체를 Observable 형태로 변환한다. - get() 함수의 return 값이 전달된다.
from()
:Iterable
또는 배열의 아이템을 순차적으로 발행하는 옵저버블을 생성한다. 예를들어 Observable.from(list) list 배열이 가지고 있는 요소들을 순차적으로 발행한다.
3 Observable.just를 이용하여 Observable을 생성하는 방법
fun main(args: Array<String>){
val list = listOf(1, 2, 3)
val num = 3
val str = "wow!"
val map = mapOf(1 to "one", 2 to "two")
val justOb = Observable.just(list, num, str, map)
val observer: Observer<Any> = object : Observer<Any>{
override fun onComplete() { println("onComplete()") }
override fun onNext(item: T) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $e") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}
justOb.subscribe(observer)
}
just는 인자로 들어온 모든 타입의 요소들을 전부 한꺼번에 반환 해 준다. list처럼 각 인덱스 값이 따로따로 필요하다면 fromIterable()을 사용해야 한다. 즉, List 타입의 요소를 사용하려면 fromIteravle()을 사용 하라는 것이다.
range
val observer: Observer<Any> = object : Observer<Any>{
override fun onComplete() { println("onComplete()") }
override fun onNext(item: T) { println("onNext() - $item") }
override fun onError(e: Throwable) { println("onError() - $e") }
override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}
Observable.range(1,3).subscribe(observer)
위와 같이 range는 특정 범위 만큼 수를 생성하여 전달한다.
empty
val observer = object : Observer{ ... }
Observable.empty().subscribe(observer)
empty는 어떠한 값을 전달하지는 않지만 onComplete()를 호출 해 준다. 아 물론 observer가 호출될 때 실행되는 onSubscribe() 함수가 먼저 실행된다.
interval
val observer = object : Observer<Long>{ ... }
Thread().start() // 여기부터 thread 시작
val th1 = Thread(){ Thread.sleep(300) }
th1.start()
th1.join()
interval은 단어 뜻 그대로 시간차 간격을 두고 다음 로직을 처리하는 연산자이다.
never
아무런 아이템도 발행하지 않고, 완료도 발행하지 않는 옵저버블을 생성한다. 결과 값으로는 onNest, onComplete, onSubscribe, onError 이 중에 아무것도 실행되지 않는다.