Michał Idzik
Informatyka AGH
- Observer otrzymuje notyfikacje o zmianie stanu Observable.
- Luźne powiązanie między obiektami:
Observable nie wie o implementacjach swoich obserwatorów.
- Observer otrzymuje kolejne elementy typu T, emitowane przez Observable.
- Emisja kończy się gdy zostaje wywołana metoda onComplete() lub onError(Throwable)
- Połączenie wzorców Observer i Iterator
Observable<Integer> source = Observable.just(0,1,2,3,4);Observer
może być prostym obiektem
typu Consumer<T>
realizującym onNext(T)
Observable
może emitować dowolną kolekcję elementów typu T
source.subscribe(number -> System.out.println(2*number))Observer
może definiować również handler
do obsługi błędów
Observable
może emitować obiekty w dowolny sposób, wywołując odpowiednie metody obserwatora.
Observable<Integer> source = Observable.create(observer -> {
for (int i = 0; i < 5; i++) {
if (i != 3) {
observer.onNext(i);
} else {
observer.onError(new Exception());
}
}
observer.onComplete();
});source.subscribe(number -> System.out.println(2*number),
error -> error.printStackTrace());Observable.just(0,1,2,3,4).map(number -> "a" + number).subscribe(text-> System.out.println(text));Observable.just(0,1,2,3,4).filter(number -> number % 2 == 1).subscribe(number ->
System.out.println(2*number));Observable.just(0,1,2,3,4).map(number -> "a" + number).subscribe(text-> System.out.println(text));.filter(number -> number % 2 == 1)Observable<Integer> source = Observable.just(0,1,2,3,4)
.filter(number -> number % 2 == 1)
.map(number -> "a" + number);source.subscribe(text-> System.out.println(text));source.subscribe(text-> System.out.println(text));o1 :
o2:
Observable<Integer> o3 = Observable.merge(o1, o2);o3:
- Wysokopoziomowe zarządzanie wątkami
- Gotowe rozwiązania dla typowych przypadków:
Schedulers.io() - zoptymalizowany pod operacje IO, każde nowe zadanie to nowy wątek.
Schedulers.computation() - przeznaczony dla długich operacji, utrzymuje optymalny rozmiar puli wątków.
...
- Konfiguracja bezpośrednio w łańcuchu operacji RxJava:
- Wywołanie subscribeOn() sprawia, że zarówno emisja obiektów,
jak i ich obsługa przez subskrybentów odbywa się na danym Schedulerze!
Uwaga: ustawienie Schedulera wpływa na cały strumień i nie sprawia,
że jego elementy przetwarzane są równolegle!
Tego typu efekt można osiągnąć używając operatorów
merge() lub flatMap().
Observable.just(0,1,2,3,4)
.map(number -> "a" + number)
.subscribeOn(Schedulers.io())
.subscribe(text-> System.out.println(text));Observable.just(1,2,3,4)
.flatMap(x -> Observable.fromIterable(Collections.nCopies(x, x)))
.subscribe(System.out::println);Observable<GroupedObservable<Boolean, Integer>> groupedObservable = Observable.just(0, 1, 2, 3, 4, 5, 6, 7, 8)
.groupBy(x -> x % 2 == 0);
groupedObservable.subscribe(group -> {
if (group.getKey()) {
processEven(group);
} else {
processsOdd(group);
}
});