UniRxのディープなつまずきの紹介

こんにちは。はじめましての人ははじめまして。クラスター株式会社でSoftware Engineerをしているfaidraです。(なぜかUniRxの記事を書きがち)

clusterに限らず、インタラクティブで大規模なアプリケーションには リアクティブプログラミング(Rx) が使われている箇所が多くあると思います。clusterではUnity部分の実装としてUniRxというライブラリを使っており、大変重宝しております。
UniRxは大変便利なのですが、長く使っているとやはりいろいろな問題に出くわすことがあります。
そんな問題の中から今回は、なかなか気づきづらいディープなつまずきをしたケースを2つ紹介したいと思います。

Disposeしたけど動いている?

clusterではユーザーは様々なアバターにリアルタイムで着替えることが出来ます。
アバターを表示するには様々なデータを非同期的にロードする必要があり、また使い終わった後にリソースを破棄する必要があるため、そのあたりを抽象化して1つのDisposableとして作成していました。

どのアバターを使うかのObservable()
  .Subscribe(avatar =>
  {
    現在のアバター.Dispose();
    現在のアバター = アバターのタスク(avatar);
  });

また、clusterにはメモリ負荷などを抑えるため、1つのアバターに対し複数のモデルを動的に切り替える、AvatarLODと呼ばれている仕組みがあります。その機能のクライアント側の実装として、1つのアバターのタスクの中で複数のモデルを切り替える処理が書かれています。

IDisposable アバターのタスク(Avatar avatar)
{
  var disposables = new CompositeDisposable();

  どのモデルを使うかのObservable()
    .Subscribe(model =>
    {
      現在のモデル.Dispose();
      現在のモデル = モデルのタスク(model);
    }).AddTo(disposables);

  現在のモデルを破棄.AddTo(disposables);

  return disposables;
}

これらのコードによって

  • 新しいアバターをロードする前に、古いアバターのタスクはDisposeされること
  • アバターのタスクをDisposeすれば、必ずモデルのタスクはDisposeされること

が満たされ、モデルのリソースはきちんとリリースされる、そのはずでした(自信がある人はどこに問題があるか考えてみましょう)。
しかし、QAチームから「アバターを切り替えるとメモリリークしているようだ」との報告が上がってきます。
初めはそんなことないやろ~と思っていたのですが、調査の結果、どうやらDisposeした後にモデルのタスクが実行されているようだということが分かりました。

Observableは根底をたどるとSubjectを利用していることが多いです(今回のケースではReactiveDictionary.ObserveReplaceでした)。そして、Subjectの内部ではListObserverというものが使われており、この実装は以下のようになっています。

private readonly ImmutableList<IObserver<T>> _observers;

public void OnNext(T value)
{
  var targetObservers = _observers.Data;
  for (int i = 0; i < targetObservers.Length; i++)
  {
    targetObservers[i].OnNext(value);
  }
}

使われているのはImmutableListであり、一度OnNextが始まれば、その途中で何かがSubscribeされようがDisposeされようが、OnNextされた時点で登録されていたObserverに対してOnNextを呼んでいることが分かります。つまり、購読がDisposeがされていても、そのOnNextが呼ばれるケースが存在するということが分かります(Subject以外のObservableでも同様の挙動になっているはずです)。

subject.Subscribe(A);
subject.Subscribe(B);
subject.OnNext
└ListObserver.OnNext
  └ImmutableList.Data // ここでA, Bが列挙されることが確定
    ├A.OnNext // ここでsubjectやBをDisposeしようとも……
    └B.OnNext // これは必ず呼ばれる!

今回のケースでは、「どのアバターを使うかのObservable」と「どのモデルを使うかのObservable」が元を辿れば同じSubjectだったためこの現象が起こりました。

解決方法としては、

  • Disposeされた後の挙動を必ずカバーする

    • 例えばSingleAssignmentDisposableやSerialDisposableを活用する
  • コンテキストに応じた情報に正しく絞る

    • 今回のケースで言うと、どのモデルを使うかではなく、特定のアバターのうちのどのモデルを使うかとする

などが考えられます。

IDisposable アバターのタスク(Avatar avatar)
{
  var disposables = new CompositeDisposable();
  var serialDisposable = new SerialDisposable().AddTo(disposables);

  指定したアバターの中でどのモデルを使うかのObservable(avatar)
    .Subscribe(model =>
    {
      var newModelDisposable = new SingleAssignmentDisposable();
      serialDisposable.Disposable = newModelDisposable; // ここで今あるものがDisposeされる
      newModelDisposable.Disposable = モデルのタスク(model); // Dispose後であれば、ここでモデルのタスクが即座にDisposeされる
    }).AddTo(disposables);

  return disposables;
}

例外後にも動作が残る?

ネットワークを使ったアプリケーションでは、非同期の動作を行いつつ例外の処理もしなければいけないというケースが多数あると思います。
UnityでもUniTaskの普及によってasync/awaitが使われることが多くなり、単純なタスクとしてObservableを使う機会は少なくなりました。
しかし、情報のフローを制御したり、随時切り替わる情報を利用したりする場合にはやはりObservableが便利で、例外処理も考慮したObservableによる処理を書く機会もあると思います。

表示したい情報源のObservable()
  .Select(情報の取得)
  .Switch() // このSwitchがTaskだと結構めんどう
  .OnErrorRetry(エラーの表示)
  .Subscribe(取得した情報の表示);

しかし、実際にはこのコードは意図した通りには動きません。情報の取得に失敗するごとに購読が溜まっていき、2回目以降の情報取得失敗時にInvalidOperationExceptionが発行されてしまいます。
Switchなどの情報をマージするオペレーター(他にはMergeやSelectMany)の直後に例外の処理を行う(Catch系のオペレーターやSubscribeを書く)とこのようなことが起こるようです。

調べたところ、マージ系のオペレーターの内部で例外が発生した時、そのオペレーターは自身をDisposeせず、また例外処理を行うObserverはOnErrorを受けても親をDisposeしないのが原因でした。
この現象が起こると、マージ系のオペレーターの購読が残り続け、購読の一部が動いたまま消すことができない状態になってしまいます。

class SwitchObserver : OperatorObserverBase<IObservable<T>, T>
{
  // Subuscribeしているが、このクラス自体の状態は変えていない
  public IDisposable Run()
  {
    var subscription = parent.sources.Subscribe(this);
    return StableCompositeDisposable.Create(subscription, innerSubscription);
  }

  public override void OnError(Exception error)
  {
    lock (gate)
    {
      try { observer.OnError(error); }
      finally { Dispose(); } // このクラスしかDisposeしておらず、parentはノータッチ
    }
  }
}

Catchするのがマージ系のオペレーターの直後ではない場合にはこの挙動は起こらないので、UniRxのバグではないかと自分は考えています(まだマージはされていませんがPRを出しています)。
例外周りで意図せぬ挙動が起こった時にはこの記事を思い出して、自身のプロジェクトにこのPRのパッチを当ててもらえるといいかなと思います。

さいごに

Rxはインターフェースは非常にシンプルですが、ライブラリ内ではコーナーケースの対応や細かな最適化がなされており、一見して挙動が分かりづらいと感じるかもしれません。ただ、UniRxの内部は一貫して構築されていることが多いので、コツさえ掴めば案外読むことができ、Rxの本質把握に大いに役立てることが出来ます(例えば、多くのオペレーターはSubscribeCoreやRunという関数に動作のロジックが書かれています)。

沼のような内容の話になってしまいましたが、この記事がどこかで皆さんの役にたてば嬉しいです。
インタラクティブなアプリケーションの構築に腕を振るいたいというエンジニアの方はぜひクラスターにお声がけください