Carpe Diem

備忘録

RxJSのfinallyの挙動

概要

Angularにはリクエストのクエリパラメータを取得する方法として、

@Component({...})
export class MyComponent implements OnInit {
  constructor(private activatedRoute: ActivatedRoute) {}

  ngOnInit() {
    this.activatedRoute.params.subscribe((params: Params) => {
        let userId = params['userId'];
        console.log(userId);
      });
  }
}

というようにActivatedRouteがあるのですが、今回はその中でAPI側コールし、もしかしたらエラーが発生する可能性もあるのでfinallyで後処理を必ずさせたい、という状況でした。

そこで

    this.activatedRoute.params
      .finally(() => {
        // 後処理
      })
      .subscribe((params: Params) => {
        let userId = params['userId'];

        // エラーが起きるかもしれないAPIコール
      });

としてみたところ、finallyの処理が実行されませんでした。何故でしょうか?

環境

  • rxjs 5.2.0

finallyはcompleteイベントが無いと実行されない

当然といえば当然なのですが、これを見落としていて詰まりました。

let obs = Rx.Observable.create(observer => {
  observer.next(1);
});

obs = obs.finally(() => {
  console.log('finally1');
});

obs.subscribe(
  function (x) {
    console.log('Next: ', x);
  },
  function (err) {
    console.log('Error: ', err);
  },
  function () {
    console.log('Completed');
  });

Finally - no complete

結果

"Next: " 1

これはfinallyの部分は実行されません。observer.complete()を実行していないためです。

先程のActivatedRouteは、実はcompleteイベントを実行していないので、finallyが実行されませんでした。
テストコードではObservable.of()でモック化しており、これは自動でcompleteイベントを送るので挙動が異なり見落としてしまっていました。

ネストした時はいつ実行されるのか

僕が詰まっていた問題は前述の件で解決できたのですが、問題のコードではネストした状態にもなっていて、それが原因なのだろうか?とも思って検証してみました。

let obs = Rx.Observable.of(1)

obs = obs.finally(() => {
  console.log('finally1');
});

let source = Rx.Observable.of(2);

obs.subscribe(
  v => {
    console.log(v);
    source.subscribe(
    function (x) {
      console.log('Next: ', x);
    },
    function (err) {
      console.log('Error: ', err);
    },
    function () {
      console.log('Completed');
    });
  }
);

Finally - nested

結果

1
"Next: " 2
"Completed"
"finally1"

ネストした部分が実行された後にfinallyが実行されています。

上流がcompleteしない場合、ネスト内でエラーが起きても実行されない

finallyは例外が起きたら実行されるとだけ考えていると、前述のネストの内部でエラーが起きたら発火してくれるんじゃないか?と感じます。

let obs = Rx.Observable.create(observer => {
  observer.next(1);
})

obs = obs.finally(() => {
  console.log('finally1');
});

let source = Rx.Observable.throw(new Error('some error'));

obs.subscribe(
  v => {
    console.log(v);

    source.subscribe(
    function (x) {
      console.log('Next: ', x);
    },
    function (err) {
      console.log('Error: ', err);
    },
    function () {
      console.log('Completed');
    });
  }
);

Finally - nested error

結果

1
"Error: " "some error"

ダメでした。つまりfinallyはfinallyをセットする前のStreamでcompleteイベントが発火されないとダメ、ということがわかります。
なので僕の最初のコードも、あの場所にfinallyをセットしていてもAPIコールで例外が起きたとしても処理されない、ということになります。

concatで繋げた時はいつ実行されるのか

let obs = Rx.Observable.of(1)

obs = obs.finally(() => {
  console.log('finally1');
});

let source = Rx.Observable.of(2);

obs.concat(source).subscribe(
  function (x) {
    console.log('Next: ', x);
  },
  function (err) {
    console.log('Error: ', err);
  },
  function () {
    console.log('Completed');
  }
);

Finally - concat

結果

"Next: " 1
"finally1"
"Next: " 2
"Completed"

concatで繋げられたものを実行する前にfinallyが実行されます。
ちなみにconcatは上流のStreamがcompleteしないと、後に繋げたStreamはそもそも実行されません。

上流でcompleteが保証されない場合はどうすればよいか

先程のAngularのように、ライブラリ側でcompleteしないことが分かっている場合どうすればよいでしょうか。

take(1)などでcompleteイベントを発火させる

let obs = Rx.Observable.create(observer => {
  observer.next(1);
  observer.next(2);
}).take(1);

obs = obs.finally(() => {
  console.log('finally1');
});

let source = Rx.Observable.of(3);

obs.subscribe(
  v => {
    console.log(v);

    source.subscribe(
    function (x) {
      console.log('Next: ', x);
    },
    function (err) {
      console.log('Error: ', err);
    },
    function () {
      console.log('Completed');
    });
  }
);

Finally - take

結果

ちゃんとfinallyが実行されることが分かります。

1
"Next: " 3
"Completed"
"finally1"

unsubscribeすると強制的にcompleteしてくれる

また別の方法として、unsubscribeすることでcompleteを実行してくれます。

let obs = Rx.Observable.create(observer => {
  observer.next(1);
});

obs = obs.finally(() => {
  console.log('finally1');
});

let obs2 = Rx.Observable.of(2);

let source = obs.subscribe(
  v => {
    console.log(v);

    obs2.subscribe(
    function (x) {
      console.log('Next: ', x);
    },
    function (err) {
      console.log('Error: ', err);
    },
    function () {
      console.log('Completed');
    });
  }
)

source.unsubscribe();

Finally - unsubscribe

結果

1
"Next: " 2
"Completed"
"finally1"

unsubscribe - concatの場合

ちなみにconcatの場合どうなんでしょうか。

let obs = Rx.Observable.create(observer => {
  observer.next(1);
});

obs = obs.finally(() => {
  console.log('finally1');
});

let obs2 = Rx.Observable.of(2);

let source = obs.concat(obs2).subscribe(
  function (x) {
    console.log('Next: ', x);
  },
  function (err) {
    console.log('Error: ', err);
  },
  function () {
    console.log('Completed');
  });

source.unsubscribe();

Finally - concat unsubscribe

結果

finallyは実行されますが、上流しか実行されません。

"Next: " 1
"finally1"

Angularのhttp moduleはどうか

stackoverflow.com

ソース的にcompleteが呼ばれているので、finallyが実行されます。

まとめ

今回はgolangのdeferの遅延実行ように考えていて勘違いが起きました。

finallyの挙動として、

  • finallyをセットする前のStream(上流)で、completeイベントが実行されないと発火されない
  • finallyをセットした後のsubscribe内でエラーが起きても、上流がcompleteしないとfinallyの処理は実行されない

という事がわかります。なので

  • 上流でcompleteが実行されるのかどうか
  • finallyをセットする場所(どこで例外が起きうるのか)

についてしっかり考えてから利用しましょう。

ソース