RxJava

Michał Idzik
Informatyka AGH

Wzorzec Obserwator

- Observer otrzymuje notyfikacje o zmianie stanu Observable.

- Luźne powiązanie między obiektami:
  Observable nie wie o implementacjach swoich obserwatorów.

 

Model

ReactiveX Observable

- Observer otrzymuje kolejne elementy typu T, emitowane przez Observable.

- Emisja kończy się gdy zostaje wywołana metoda onComplete() lub onError(Throwable)

- Połączenie wzorców Observer i Iterator

 

 

Observable<Integer> source = Observable.just(0,1,2,3,4);

Observer

może być prostym obiektem
typu Consumer<T>
realizującym onNext(T)

Observable

może emitować dowolną kolekcję elementów typu T

source.subscribe(number -> System.out.println(2*number))

RxJava - jak to działa?

Observer

może definiować również handler
do obsługi błędów

Observable

może emitować obiekty w dowolny sposób, wywołując odpowiednie metody obserwatora.

Observable<Integer> source = Observable.create(observer -> {
    for (int i = 0; i < 5; i++) {
        if (i != 3) {
            observer.onNext(i);
        } else {
            observer.onError(new Exception());
        }
    }
    observer.onComplete();
});
source.subscribe(number -> System.out.println(2*number),
                 error -> error.printStackTrace());

RxJava - jak to działa?

Observable.just(0,1,2,3,4)
.map(number -> "a" + number)
.subscribe(text-> System.out.println(text));

Przetwarzanie danych:
map()

Observable.just(0,1,2,3,4)
.filter(number -> number % 2 == 1)
.subscribe(number -> 
    System.out.println(2*number));

Przetwarzanie danych:
filter()

Observable.just(0,1,2,3,4)
.map(number -> "a" + number)
.subscribe(text-> System.out.println(text));
.filter(number -> number % 2 == 1)

Przetwarzanie danych:
Złożenie operacji

Observable<Integer> source = Observable.just(0,1,2,3,4)
.filter(number -> number % 2 == 1)
.map(number -> "a" + number);
source.subscribe(text-> System.out.println(text));
source.subscribe(text-> System.out.println(text));

Ponowne użycie strumieni

Łączenie wielu strumieni
merge()

o1 : 

o2: 

Observable<Integer> o3 = Observable.merge(o1, o2);

o3: 

Wielowątkowość
Schedulers

- Wysokopoziomowe zarządzanie wątkami

- Gotowe rozwiązania dla typowych przypadków:

 

 

Schedulers.io() - zoptymalizowany pod operacje IO, każde nowe zadanie to nowy wątek.

 

Schedulers.computation() - przeznaczony dla długich operacji, utrzymuje optymalny rozmiar puli wątków.

...

 

Wielowątkowość
Schedulers

- Konfiguracja bezpośrednio w łańcuchu operacji RxJava:

 

 

- Wywołanie subscribeOn() sprawia, że zarówno emisja obiektów,
jak i ich obsługa przez subskrybentów odbywa się na danym Schedulerze!

 

Uwaga: ustawienie Schedulera wpływa na cały strumień i nie sprawia,
że jego elementy przetwarzane są równolegle!

Tego typu efekt można osiągnąć używając operatorów
merge() lub flatMap().

 

Observable.just(0,1,2,3,4)
.map(number -> "a" + number)
.subscribeOn(Schedulers.io())
.subscribe(text-> System.out.println(text));

Rozdzielanie strumieni
flatMap()

Observable.just(1,2,3,4)
          .flatMap(x -> Observable.fromIterable(Collections.nCopies(x, x)))
          .subscribe(System.out::println);

Rozdzielanie strumieni
groupBy()

Observable<GroupedObservable<Boolean, Integer>> groupedObservable = Observable.just(0, 1, 2, 3, 4, 5, 6, 7, 8)
                                                                              .groupBy(x -> x % 2 == 0);

groupedObservable.subscribe(group -> {
    if (group.getKey()) {
        processEven(group);
    } else {
        processsOdd(group);
    }
});