Carpe Diem

備忘録

RxのflatMapの使い方

概要

AngularでHttpModuleを連続して使う必要が出た時に、「毎回subscribeでネストが深くなるのが嫌だなぁ」と思ってぐぐったところ、「flatMapを使うと良いよ!」という記事を見つけて使ってみようとしたのがきっかけです。

他のmapやfilterなどと違ってパッと理解できなかったのと、lodashのflatMap(arrayをflattenするやつ)のイメージのせいで詰まったのでまとめました。

環境

  • rxjs 5.1.0

flatMapはmap + mergeと考えるとイメージしやすい

公式のflatMapのイメージ図は以下です。

f:id:quoll00:20170225091903p:plain

ref: ReactiveX - FlatMap operator

初見だと分かりづらいですね。
なので map(itemを変換)とmerge(複数のObservableを束ねる)がくっついたもの と考えるとイメージしやすいです。

map

f:id:quoll00:20170225092433p:plain

ref: ReactiveX - Map operator

mapは流れてくるitemを逐次変換するoperatorです。

merge

f:id:quoll00:20170225092623p:plain

ref: ReactiveX - Merge operator

一方mergeは複数のObservableの流れがあるときに、それを1つにまとめたいという場合に使うoperatorです。

const source1 =  Rx.Observable.of(1, 2, 3);
const source2 =  Rx.Observable.of(4, 5, 6);

const merged = Rx.Observable.merge(source1, source2);

function test() {
  merged.subscribe(val => {
    console.log(val);
  });
}

RxJS Playground

結果

1
4
2
5
3
6

2つのStreamが束ねられてますね。

flatMap

先程の2つを踏まえて考えると以下のようなイメージができると思います。

f:id:quoll00:20170225095413p:plain

※:flatMapLatestはflatMapと結果が少し異なります(黄緑の□が消えてる)が、イメージとして載せてます

具体的な使い方(基本編)

1, 2, 3と順に渡して、それを2倍にするObservableに渡して束ねるようにします。

const source1 =  Rx.Observable.of(1, 2, 3);

const mapped = source1
  .flatMap(val => {
    return Rx.Observable.of(val * 2); 
  });

function test() {
  mapped.subscribe(val => {
    console.log(val);
  });
}

RxJS Playground

結果

2
4
6

ただこれだとmapで出来るのにわざわざObservable.ofで複雑にしてない?と思います。
その通りで、こういった書き方は普通しません。次のようにhttpModuleといったObservableなメソッドを使いたい時にメリットが生まれます。

具体的な使い方(応用編)

export class AppComponent {
  homeworld: Observable<{}>;
  constructor(private http: Http) { }
  
  ngOnInit() {
    this.homeworld = this.http.get('/api/people/1')
      .map(res => res.json())
      .flatMap(character => this.http.get(character.homeworld))
  }
}

このように1つ前のレスポンスの値characterを使って、次のhttpリクエストしたいと言った時にsubscribeのネストを作らずに書くことができます。
1つ前のレスポンスの値を使ってがポイントで、これが無ければ単に.merge()concatでもObservableを束ねることができるのでわざわざflatMapを使わなくても大丈夫です。

注意として、mergeの場合

var source1 = Rx.Observable.create(observer => {
  setTimeout(() => {
    observer.next(1);
    observer.complete();
  }, 2000);
});
var source2 = Rx.Observable.of(2);

function test() {
  source1.merge(source2).subscribe(
    val => {
      console.log(val);
    }
  );
}

RxJS Playground

のように前のobservableが遅い処理の場合、

2
1

と、mergeで後からくっつけたobservableの方が先に実行されます。順番を保証したい場合はconcatを使いましょう。

注意

concatmergeは複数のObservableを連結できますが、連結すれば当然subscribeしたときに結果が複数回きます
もしその後で結果の値を使ったり、遷移するロジックを入れていたりすると、そのロジックも複数回実行されるので最後の処理が終わったらこれをしたいという場合はlastを付けるようにしましょう。

ソース