옵저버 패턴을 적용하여 구독자에게 변경사항을 알려주는 programming 기법.

하나의 값 보다는 data stream을 반환한다.

RxJava의 푸시 메커니즘과 풀 메커니즘 비교

RxKotlin은 프로그램에서 사용되는 반복자 패턴(Iterator)의 풀 메커니즘 대신 푸시 메커니즘의 데이터 / 이벤트 시스템으로 대표되는 옵저버블 패턴을 중심으로 작동한다. 여기서 Iterator 패턴은 아래와 같은 로직을 의미한다.

// iterator 방식
fun main(args: Array<String>){
    var list: List<Any> = listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f)
    var iterator = list.iterator()
    while(iterator.hasNext()){
        println(iterator.next())
    }
}

알아두어야 할 점이 데이터가 사용 준비 될 때 까지 기존 스레드는 블로킹 상태가 된다. 즉 네트워크 호출이나 데이터베이스 쿼리를 이용해서 데이터를 가져온다면 스레드가 얼마나 블로킹 돼있을지 생각 해 보자. 위와 같은 방법은 그다지 효율적이지 않은 코드라고 할 수 있겠다.


observable 함수의 종류

onNext(item T) : 값을 전달할 때 호출하여 값을 넘겨준다.

onError(e: Throwable) : 에러가 발생할 경우에 호출.

onSubscribe(d: Disposable) : 구독을 신청하면 호출해준다.

onComplete() : 가지고 있는 값을 모두 전달하면 호출한다.


val observer: Observer<String> = object : Observer<String>{
   override fun onComplete() { println("onComplete()") }
   override fun onNext(item: T) { println("onNext() - $item") }
   override fun onError(e: Throwable) { println("onError() - $e") }
   override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}

1 Observable.create을 이용하여 Observable을 생성하는 방법

fun main(args: Array<String>){
    val observable1 = Observable.create{
        it.onNext(1)
        it.onNext(2)
        it.onComplete()
    }

    val observable2 = Observable.create{
        it.onNext(1)
        it.onNext(2)
        it.onError()
    }

    val observer: Observer<String> = object : Observer<String>{
        override fun onComplete() { println("onComplete()") }
        override fun onNext(item: T) { println("onNext() - $item") }
        override fun onError(e: Throwable) { println("onError() - $e") }
        override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
    }

    observable1.subscribe(observer)
    observable2.subscribe(observer)
}

2 Observable.from~~~을 이용하여 Observable을 생성하는 방법

fun main(args: Array<String>){

    val list = listOf(1, 2, 3)
    val listOb = Observable.fromIterable(list)

    val call = Callable<int> { 4 }
    val callOb = Observable.fromCallable(call)

    val future = object: Future<Int>{
        override fun get() = 5
        override fun get(timeout: Long, unit: Timeout) = 6
        override fun isDone() = true
        override fun isCancellable() = false
    }

    val futureOb = Observable.fromFuture(future)
    
    val observer: Observer<Int> = object : Observer<Int> {
        override fun onComplete() { println("onComplete()") }
        override fun onNext(item: T) { println("onNext() - $item") }
        override fun onError(e: Throwable) { println("onError() - $e") }
        override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
    }
    
    listOb.subscribe(observer)
    callOb.subscribe(observer)
    futureOb.subscribe(observer)
}

from 함수의 다양한 형태

fromIterable() : list처럼 iterable을 지원하는 instance를 Observable 형태로 변환해준다.

  • 각 개별 아이템이 하나씩 전달 됨. fromCallable() : Callable 객체를 Observable 형태로 전달한다.
  • call() 함수의 return 값이 전달 된다. fromFuture() : Future 객체를 Observable 형태로 변환한다.
  • get() 함수의 return 값이 전달된다. from() : Iterable 또는 배열의 아이템을 순차적으로 발행하는 옵저버블을 생성한다. 예를들어 Observable.from(list) list 배열이 가지고 있는 요소들을 순차적으로 발행한다.

3 Observable.just를 이용하여 Observable을 생성하는 방법

fun main(args: Array<String>){
    val list = listOf(1, 2, 3)
    val num = 3
    val str = "wow!"
    val map = mapOf(1 to "one", 2 to "two")
    val justOb = Observable.just(list, num, str, map)
    val observer: Observer<Any> = object : Observer<Any>{
        override fun onComplete() { println("onComplete()") }
        override fun onNext(item: T) { println("onNext() - $item") }
        override fun onError(e: Throwable) { println("onError() - $e") }
        override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
    }
    justOb.subscribe(observer)
}

just는 인자로 들어온 모든 타입의 요소들을 전부 한꺼번에 반환 해 준다. list처럼 각 인덱스 값이 따로따로 필요하다면 fromIterable()을 사용해야 한다. 즉, List 타입의 요소를 사용하려면 fromIteravle()을 사용 하라는 것이다.

range

val observer: Observer<Any> = object : Observer<Any>{
    override fun onComplete() { println("onComplete()") }
    override fun onNext(item: T) { println("onNext() - $item") }
    override fun onError(e: Throwable) { println("onError() - $e") }
    override fun onSubscribe(d: Disposable) { println("onSubscribe() - $d") }
}

Observable.range(1,3).subscribe(observer)

위와 같이 range는 특정 범위 만큼 수를 생성하여 전달한다.

empty

val observer = object : Observer{ ... }
Observable.empty().subscribe(observer)

empty는 어떠한 값을 전달하지는 않지만 onComplete()를 호출 해 준다. 아 물론 observer가 호출될 때 실행되는 onSubscribe() 함수가 먼저 실행된다.

interval

val observer = object : Observer<Long>{ ... }
Thread().start() // 여기부터 thread 시작
val th1 = Thread(){ Thread.sleep(300) }
th1.start()
th1.join()

interval은 단어 뜻 그대로 시간차 간격을 두고 다음 로직을 처리하는 연산자이다.

never

never 아무런 아이템도 발행하지 않고, 완료도 발행하지 않는 옵저버블을 생성한다. 결과 값으로는 onNest, onComplete, onSubscribe, onError 이 중에 아무것도 실행되지 않는다.