Kattsu Sandbox

RxJS入門#1 基本の概念をひとつずつ学ぶ

投稿日:

はじめに

rxjs6.3 で動作確認。

RxJS とはなにか

非同期とイベントのための Observer パターンを使ったライブラリ。 イベントで渡ってきたデータを自由に加工することができるので、公式サイトでは「イベント用の Lodash」と紹介されています。

Think of RxJS as Loadash for events.

非同期といえば js にはすでに Promise や async/await などの仕組みがありますが、それらとは何が違い RxJS だと何が嬉しいんでしょうか。 また、イベントといえば click とかだと思いますが、イベントから渡ってきたデータの加工をわざわざ非同期用のライブラリでするってなんだ?って感じですね。

今回は基本の概念をひとつずつ次の順に見ていきます。

  • Observable:イベントや値を RxJS で受け取れる形にする
  • Operators:受け取ったイベントや値を加工する
  • Subject:Observable を同時にいろんなところで受け取れるようにする(マルチキャストを可能にする)
  • Subscription:subscribe の解除を行う

Observable:イベントや値を RxJS で受け取れる形にする

従来の click イベント

document.addEventListener("click", (event) => console.log(event))

RxJS の click イベント

import { fromEvent } from "rxjs"

fromEvent(document, "click").subscribe((event) => console.log(event))

見慣れた従来の click イベントはいいとして、RxJS の方もなにかの結果をメソッドチェーンで受け取るというこの書き方にはちょっと見覚えがあります。 subscribe という単語にこそなってますが、これは ajax や Promise で使ってきた then に近いものがありそうですね。 あれは非同期の処理結果を jqXHR オブジェクトや Promise オブジェクトとして返してもらって、callback 地獄ではなくわかりやすくその後の処理を書けるみたいな感じだったと思います。 今回は fromEvent メソッドの返り値を subscribe があたかも then のように受け取っているわけですが、この fromEvent はなにを返しているんでしょうか。

この返してもらっているものがRxJS の肝である Observable オブジェクトです。 ここでは Event オブジェクトをラッパーした Observable が subscribe の引数に渡り、その後の処理に使用できるようになっています。

ちなみに、Observable が新たに生成されたとき(例えばイベントが発火したときなど)や変更したときを監視しているのが Observer というものです。Observable が監視される側、Observer が監視する側と単純に覚えておけばいいでしょう。 subscribe は Observable のメソッドですが、イメージ的にはObserver が Observable を subscribe していると考えた方がいいと思います。

Observable は公式サイトでは「未来の値やイベントのコレクションを呼び出せるもの」というような説明がされています。

Observable: represents the idea of an invokable collection of future values or events.

未来の値やイベントを呼び出せるというのは、「登録時点ではイベントが起きていなくても Observable としては登録できて subscribe して呼び出す準備ができる」といったような意味だと思います。

上記はイベントで説明しましたが、イベントである必要はないので例えば値をラッパーする Observable も作ることができます。

Observable インスタンスを直接作成

import { Observable } from "rxjs"

const observable = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
})

observable.subscribe((x) => console.log(x))

Observable は new からインスタンスを作る際は、引数に subscribe 関数を取ります。 subscribe 関数は引数に subscriber を取り、subscriber の next メソッドで値を渡していきます。

この書き方は冗長なので次のように書くこともできます。

from で値を渡して Observable を作成

import { from } from "rxjs"

const observable = from([1, 2, 3])
observable.subscribe((v) => console.log(v))

fromEvent でイベントから Observable を作れるように、from は Array ライクなオブジェクトを引数に取り、Observable を作ることができます。また、from ではなく of を使うと、可変長の引数を渡して Observable を作ることができます。

of で可変長引数に値を渡して Observable を作成

import { from } from "rxjs"

const observable = of(1, 2, 3)
observable.subscribe((v) => console.log(v))

未来の値やイベントを呼び出せるという意味はなんとなくわかりました。 では、「未来の値やイベントのコレクションを呼び出せるもの」のコレクションとはなんでしょうか?

Operators:受け取ったイベントや値を加工する

Observable に pipe メソッドで処理を追加してコレクション化

import { fromEvent } from "rxjs"
import { mapTo, scan } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(
    mapTo(1), // eventオブジェクトを1に変換
    scan((count, click) => count + click, 0) // countに1を加算する。countの初期値は0
  )
  .subscribe((count) => console.log(`Clicked ${count} times`))

Observable を pipe でつなげることにより、その中で順番にイベントや値を加工していくことができます。 最初に RxJS をイベント用の Lodash と言いましたが、要はコレクションとは Lodash のように値やイベントを自在に加工した後のもの的な意味です。

この加工に使える mapTo や scan は Operators と呼ばれています。map,filter,reduce,every みたいな Array に使うメソッドや Lodash っぽいものが集まっています。scan は reduce みたいなものです。 ちなみに fromEvent や from などの Observable を最初に作り出すものは Creation Operators と呼ばれます。 Operators は 100 ぐらいあり、Operators の中でも加工用以外種類もあったりするのですが今回はあまり扱いません。

さて、イベントを非同期のような仕組みで書けて、さらにそのイベントや値を自在に変換してその後の処理に渡せることはわかりました。 では、それが嬉しいパターンってなんでしょうか?イベントの値を変換するだけなら RxJS でなくてもよさそうです。

コンポーネント間のイベントのやり取りが複雑な SPA でこそ RxJS は使える

話は少し変わりますが、SPA フレームワークでのコンポーネント間のイベントの伝播って大体子から親に emit してバケツリレーしていく形ですよね。 子から親ならシンプルですが、例えば子 → 親 → 別の子に伝播させたかったりするケースだけ考えてもけっこう面倒です。 ここで未来のイベントを呼び出せるという RxJS の考え方が活きてきます。

通常の emit が水(イベント)をバケツに入れてリレーさせていくと考えるとすれば、RxJS は水(値やイベント)を水道に流し各所(コンポーネント)で待ってるだけで水が流れてくるようにできるイメージだと思います。

例えるとこんな感じだと思います。

たとえRxJX の各要素
Observable(にラップされたイベントや値)
水道管Observer(Observable を監視する)
ろ過Operators
蛇口を開けるsubscribe メソッド

バケツリレーよりも圧倒的に便利そうです。 ただし、バケツリレーなら特になんの設備もなくバケツだけで水を運ぶことができますが、水道を整備するとなったら各所に水を届ける仕組みが必要で、そのあたりが少々面倒くさいところです。 なのでバケツリレーで十分ならそれでよし、複雑になりそうなら水道の手配を考えるといった使い分けがいいのではないでしょうか。

ちなみに Redux や Vuex の Store パターンは水道というよりダムを用意してあげて、すべての水はそこに置いとくから後は各所で取りに行ってねというイメージかなと思います。(水道もダムも水で説明してますが、水そのものが流れてくるというより、ペットボトルに入ったラベル付けされた水が水道から流れてくるなり、ダムに貯まってるようなイメージでしょうか。)

さて、ではどうやって各所にこの水を届ける仕組みを整えてあげられるのでしょうか? これは Subject という仕組みが使われることが多いです。

Subject:Observable を同時にいろんなところで受け取れるようにする(マルチキャストを可能にする)

今までの説明では特に問題ありませんでしたが、Observable だけでは同時にひとつの Observer にしか値やイベントを流すことができません(ユニキャスト)。 例えば、次のように 1 つの Observable を 2 つの Observer に流すことはできますが、同時にひとつの Observable が流れることはなく、まず先に subscribe しているところに Observable が流れ終わってから、次の subscribe しているところに流れることになります。

Observable 単独ではユニキャストで Observer に流すことしかできない

import { from } from "rxjs"

const observable = from([1, 2, 3])

observable.subscribe((v) => console.log(`observerA: ${v}`))
observable.subscribe((v) => console.log(`observerB: ${v}`))

// Log
// observerA: 1
// observerA: 2
// observerA: 3
// observerB: 1
// observerB: 2
// observerB: 3

これは処理も軽いですし、同じコンポーネント内での例なので少しイメージがつきづらいですが、時間のかかる処理を複数のコンポーネントで処理したい場合であれば非同期的に処理したいはずです。 Observable 単独ではユニキャストでしか処理できないところを、Subject を使用することでマルチキャストで複数の Observer に流すことができるようになります。

A Subject is like an Observable, but can multicast to many Observers

Subject は Observable のように振る舞いますが、複数の Observer にマルチキャストできるようになります。 つまり下記のようなことが可能になります。

Subject を間に挟むことでマルチキャストで Observer に流すことができる

import { Subject, from } from "rxjs"

const subject = new Subject<number>()

subject.subscribe((v) => console.log(`observerA: ${v}`))
subject.subscribe((v) => console.log(`observerB: ${v}`))

const observable = from([1, 2, 3])
observable.subscribe(subject)

// Log
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

まず Subject インスタンスを直接作成した後、それを subscribe したときの動作を登録します。 今までは Observable を subscribe していたため少し変にも思えますが、Subject は Observable のように振る舞うためこのような書き方が可能です。 そして、Observable 自体の subscribe には先程作った Subject を渡してやります。

こうすることで、 Observable → Subject → Observer            ↘ Observer

の流れができ、Subject からはマルチキャストで Observer に届けられます。

Subject は Observable 自体としても使える

import { Subject } from "rxjs"

const subject = new Subject<number>()

subject.subscribe((v) => console.log(`observerA: ${v}`))
subject.subscribe((v) => console.log(`observerB: ${v}`))

subject.next(1)
subject.next(2)
subject.next(3)

// Log
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

Subject は Observable のように振る舞うので Observable 自体としても使うことができます。 あとから Subject に next メソッドで値やイベントを流すことができます。

Subject には様々な種類があるのですがここでは説明しません。

Subscription:subscribe の解除を行う

これまでもちょっと出てきた Subscription を最後に説明したいのですが、その前に subscribe メソッドについて深掘りします。

subscribe メソッドの引数は本来は next、error、complete メソッドを持つ

import { from } from "rxjs"

const observable = from([1, 2, 3])

observable.subscribe({
  next: (x) => console.log("got value " + x),
  error: (err) => console.error("something wrong occurred: " + err),
  complete: () => console.log("done"),
})

// Log
// got value 1
// got value 2
// got value 3
// done

今までは subscribe メソッドに無名関数をひとつ渡してきただけでしたが、本来は subscribe メソッドは next、error、complete の 3 つのメソッドを持ちます。

それぞれの意味はなんとなくわかると思いますが、 next は Observable を受け取ったとき、 error は Observable の受け取りに失敗したとき、 complete は Observable を受け取り終わったとき、 にそれぞれ発火します。

subscribe メソッドは連想配列形式でこれらメソッドを渡すこともできますが、無名関数を順番に引数に渡すだけでも next、error、complete の順に関数が登録されていきます。 なので、上の書き方は次のように書くこともできます。

subscribe メソッドには引数に無名関数を渡すだけでもいい

import { from } from "rxjs"

const observable = from([1, 2, 3])

observable.subscribe(
  (x) => console.log("got value " + x),
  (err) => console.error("something wrong occurred: " + err),
  () => console.log("done")
)

// Log
// got value 1
// got value 2
// got value 3
// done

渡した無名関数の順に next、error、complete になります。 もし、error、complete が不要であれば第一引数に next 用の無名関数を渡せばいいだけなので、今まではこれを使用してきました。

さて、subscribe メソッドがわかったところで Subscription について見ていきます。

Subscription は subscribe メソッドの戻り値

import { interval } from "rxjs"

const observable = interval(1000)
const subscription = observable.subscribe((x) => console.log(x))

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

// Log
// 0
// 1
// 2
// 3
// 4

Subscription は subscribe メソッドの戻り値です。 その主要な役割はunscribe による subscribe の解除です。

上の例では、interval で 1 秒に 1 回、1 ずつ増える Observable が渡ってきますが、5 秒後に unsubscribe しているため、それ以降ログが吐かれることはありません。

まとめ

RxJS でイベントや値を流す側と、受け取る側を疎結合にできるので、SPA のように複数のコンポーネントでそれらを分けて管理したいときに便利そうなことがわかりました。 最後に水と水道で例えるとこのようになるかと思います。

たとえRxJX の各要素
Observable(にラップされたイベントや値)
水道管Observer(Observable を監視する)
複数の水道管につなげられる仕組みSubject
ろ過Operators
蛇口を開けるsubscribe メソッド
蛇口を閉めるSubscription.unsubscribe メソッド

参考

RxJS 公式サイト

書いている人

大阪でソフトウェアエンジニアとして働いています。

© 2020 Kattsu Sandbox