はじめに
rxjs6.3 で動作確認。 RxJS 公式サイトから Operators の Marble diagrams を引っ張ってきて説明しています。 個人的に重要だと思う Operators のみ説明しています。 MultiCast Operators 系はまた別でまとめたいと思います。
図の見かた
まず、図の見かたですが、map(オペレータ名)を挟んで上の矢印と値がオペレータ適用前の Input Observable、下がオペレータ適用後の Output Observableと呼ばれるものです。 また、矢印は時間軸の流れを表しています。
オペレータは Input Observable を受け取って Output Observable を返す、その作用はどのタイミングで値を受け取るかによっても違ってくることが多いので、このような図となっています。 ちなみに、公式ではこの図は Marble diagrams と呼ばれています。
作成用オペレータ(Creation Operators)
from:イテレータから Observable を作成する
配列などイテラブルな値から Observable を作成します。
from([10, 20, 30]).subscribe((x) => console.log(x))
of:可変長引数から Observable を作成する
可変長引数から Observable を作成します。
of(1, 2, 3).subscribe((x) => console.log(x))
fromEvent:イベントから Observable を作成する
第一引数に DOM、第二引数にイベント名を渡し、イベント発火時に Observable が作成される。
fromEvent(document, "click").subscribe((e) => console.log(e))
interval:指定ミリ秒ごとに初期値 0 で 1 ずつ加算する値の Observable を作成する
interval(1000).subscribe((x) => console.log(x))
変換用オペレータ(Transformation Operators)
map:値を変換する
js でも Array に対して使われることの多いメソッドなので特に説明不要でわかる方も多いと思います。 引数に関数を取り、その関数で引数の値を変換して次に渡しています。
of(1, 2, 3)
.pipe(map((x) => 10 * x))
.subscribe((x) => console.log(x))
// 10
// 20
// 30
switchMap:ソースの Observable を引数の Observable に流す。被ったらキャンセルする
第 1 の Observable(ここでは of(1, 3, 5))をソースとして、第 2 の Observable に値を流して変換することができます。 第 1 の Observable の値は第 2 の Observable の値全てに流れます。
図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされ、次の値が処理されます(図だと 3 * 10 の 3 回目の処理は中断されています)。
of(1, 3, 5)
.pipe(switchMap((x) => of(10 * x, 10 * x, 10 * x)))
.subscribe((x) => console.log(x))
// 10
// 10
// 10
// 30
// 30
// (30) ← 本来はログに流れるが、次の値がやってきたら中断される
// 50
// 50
// 50
コードでは中断を表現できていませんが、それ以外はこんな感じです。
mergeMap:ソースの Observable を非同期的に引数の Observable に流す
第 1 の Observable をソースとして、第 2 の Observable に値を流して変換することができます。 第 1 の Observable の値は第 2 の Observable の値全てに流れます。
ここまでは上の switchMap と一緒ですね。 switchMap、mergeMap、次に説明する concatMap は第 1 の Observable を第 2 の Observable に流すというところまでは同じで、変換中に次の値が流れてきたときの処理だけが異なります。
mergeMap は、図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされず続けて行われ、非同期的に次の値も処理されます(図だと 3 10 の 3 回目の処理は中断されずに 5 10 の 1 回目の処理の後に行われています)。
of(1, 3, 5)
.pipe(mergeMap((x) => of(10 * x, 10 * x, 10 * x)))
.subscribe((x) => console.log(x))
// 10
// 10
// 10
// 30
// 30
// (50) ← 本来は30、50の順に流れるが、
// (30) ← 仮に変換途中に次の値(5)が流れてきた場合は非同期に処理されるのでこうなる場合がある
// 50
// 50
上記の switchMap と同じように、この Observable で表しても本来の mergeMap の非同期性を表せないのですが、わかりやすさのため図とあわせてログだけ mergeMap の非同期性を表したものにしています。
concatMap:ソースの Observable を同期的に引数の Observable に流す
mergeMap で見たように、第 1 の Observable をソースとして、第 2 の Observable に値を流して変換することができるところまでは switchMap、mergeMap と一緒です。
concatMap は、図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされず続けて行われ、同期的に次の値が処理されます(図だと 3 10 の処理の途中に、次の値である 5 を受け取れる状態になっていますが、3 10 の処理がすべて終わった後に 5 * 10 が行われています)。
of(1, 3, 5)
.pipe(concatMap((x) => of(10 * x, 10 * x, 10 * x)))
.subscribe((x) => console.log(x))
// 10
// 10
// 10
// 30
// 30
// 30
// 50
// 50
// 50
同期的に処理されるので、タイミングがどうあれログに流れる順番は必ずこうなる。
switchMap、mergeMap、concatMap をまとめると次の表のようになります。
オペレータ名 | 次の値が来たとき |
---|---|
switchMap | 現在の変換処理はキャンセルされ、次の値の変換処理が開始される |
mergeMap | 現在の変換処理は続けて行われ、次の値の変換処理が非同期的に開始される |
concatMap | 現在の変換処理は続けて行われ、次の値の変換処理が同期的に開始される |
フィルター用オペレータ(Filtering Operators)
filter:true を返す Observable のみ取得する
Array ライクな filter 同様、true を返す値のみ取得するようフィルターします。
interval(1000)
.pipe(filter((x) => x % 2 === 1))
.subscribe((x) => console.log(x))
// 1
// 3
// 5
// ......
take:指定数のみ Observable を取得する
interval(1000)
.pipe(take(5))
.subscribe((x) => console.log(x))
// 0
// 1
// 2
// 3
// 4
takeUntil:引数の Observable が流れるまで取得する
第 1 の Observable をソースに、引数の第 2 の Observable が流れたら subscribe 関数は complete を発行します。 第 2 の Observable の値はなんでもいいです。もし、第 2 の Observable が流れなければ第 1 の Observable はすべて流れるので、takeUntil はないのと一緒です。
interval(1000)
.pipe(takeUntil(fromEvent(document, "click")))
.subscribe((x) => console.log(x))
// 0
// 1
// 2
// -- clickするまで流れる --
distinctUntilChanged:連続した重複を排除する
連続した重複だけ排除したい場合に使用するのが distinctUntilChanged です。 値がオブジェクトで一部のプロパティの重複だけ感知したい場合は引数の無名関数で指定できます。 なお、連続した重複だけでなく、すべての重複を排除して一意なものだけ取り出す場合は distinct オペレータを使用します。
of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
.pipe(distinctUntilChanged())
.subscribe((x) => console.log(x))
// 1
// 2
// 1
// 2
// 3
// 4
throttleTime:指定ミリ秒間の Observable を間引く。
引数に指定したミリ秒以内の Observable はひとつしか流れないようにフィルターされます。 連続したクリックを除外したいときなどに使用します。
fromEvent(document, "click")
.pipe(throttleTime(1000))
.subscribe((x) => console.log(x))
debounceTime:指定ミリ秒間の Observable を間引き、指定ミリ秒後に Observable を返す
引数に指定したミリ秒以内の Observable はひとつしか流れないようにフィルターされます。 throttleTime と違い、指定ミリ秒後に遅れて Observable が流れます。
fromEvent(document, "click")
.pipe(debounceTime(1000))
.subscribe((x) => console.log(x))
接続用オペレータ(Join Operators)
startWith:Observable の最初の値をあとづけする
Observable の初期値をつけたいときに使用する。
of("a", "b", "c")
.pipe(startWith("s"))
.subscribe((x) => console.log(x))
// s
// a
// b
// c
withLatestFrom:引数の Observable の最新の値を伴わせる
下の例では click 時にだけログが流れ、イベントオブジェクトとともにその時点の interval が作成した Observable の値も流れる。
fromEvent(document, "click")
.pipe(withLatestFrom(interval(1000)))
.subscribe((x) => console.log(x))
// [MouseEvent, 1] // 1秒後にクリック
// [MouseEvent, 3] // 3秒後にクリック
// [MouseEvent, 7] // 7秒後にクリック