Hatena::ブログ(Diary)

CLOVER

2018-01-07

ReactorのSchedulerをちょっと見てみる

前々からReactorのSchedulerというものをぼやっとしか見てこなかったのですが、ちょうどそういう時期にこちらの
エントリが書かれたので、ちょうどよい機会だなぁと思って自分でも見てみることにしました。

ReactorでN+1問題な処理を実装してみた話 - 谷本 心 in せろ部屋

こちらのエントリでは、「ノンブロッキングでN+1にチャレンジ」という話なのですが、今回はもうちょっとSchedulerに
焦点を当てたいと思います。

準備

まあ、いろいろ言う前に、とりあえず簡単に動かすところからやっていこうと思います。

まずは準備から。Maven依存関係は、こちら。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>

        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.0.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.0.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

テストコード用に、JUnit 5を足しています(Maven Surefire Pluginの部分は端折ります)。

あとLogbackも足しているので、簡単な設定ファイルを用意しておきます。
src/test/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="debug">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

テストコードの雛形は、こんな感じ。
src/test/java/org/littlewings/reactor/SchedulersTest.java

package org.littlewings.reactor;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

public class SchedulersTest {
    void sw(Runnable runnable) {
        long startTime = System.nanoTime();

        runnable.run();

        long elapsedTime = System.nanoTime() - startTime;
        long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert(elapsedTime, TimeUnit.NANOSECONDS);
        System.out.printf("elapsed time = %1$,3d msec%n", elapsedTimeInMillis);
    }

    // ここに、テストを書く!
}

一応、時間を計測するためのメソッドも用意。

動かしてみる

さて、まずは単純な例から。こういうコードを用意。

    @Test
    public void simpleFlux() {
        sw(() -> {
            Flux<Integer> flux =
                    Flux.fromStream(IntStream.rangeClosed(1, 50).boxed())
                            .log();

            StepVerifier
                    .create(flux)
                    .expectNextCount(49)
                    .expectNext(50)
                    .verifyComplete();
        });
    }

こんな結果になります。

20:00:42.178 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:00:42.256 [main] INFO  reactor.Flux.Stream.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
20:00:42.262 [main] INFO  reactor.Flux.Stream.1 - | request(unbounded)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(1)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(2)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(3)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(4)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(5)

〜省略〜

20:00:42.267 [main] INFO  reactor.Flux.Stream.1 - | onNext(46)
20:00:42.267 [main] INFO  reactor.Flux.Stream.1 - | onNext(47)
20:00:42.267 [main] INFO  reactor.Flux.Stream.1 - | onNext(48)
20:00:42.267 [main] INFO  reactor.Flux.Stream.1 - | onNext(49)
20:00:42.267 [main] INFO  reactor.Flux.Stream.1 - | onNext(50)
20:00:42.268 [main] INFO  reactor.Flux.Stream.1 - | onComplete()
elapsed time = 470 msec

次に、Flux#withIntervalを使ってちょっとデータ生成に時間をかけるようにしてみましょう。

    @Test
    public void withInterval() {
        sw(() -> {
            Flux<Integer> flux =
                    Flux.interval(Duration.ofMillis(100L))
                            .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed()))
                            .log()
                            .map(Tuple2::getT2);

            StepVerifier
                    .create(flux)
                    .expectNextCount(49)
                    .expectNext(50)
                    .verifyComplete();
        });
    }

ログ。

20:05:17.533 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:05:17.596 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
20:05:17.602 [main] INFO  reactor.Flux.Zip.1 - request(unbounded)
20:05:17.711 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([0,1])
20:05:17.809 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([1,2])
20:05:17.910 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([2,3])
20:05:18.010 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([3,4])
20:05:18.109 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([4,5])

〜省略〜

20:05:22.309 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([46,47])
20:05:22.409 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([47,48])
20:05:22.510 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([48,49])
20:05:22.609 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([49,50])
20:05:22.613 [parallel-1] INFO  reactor.Flux.Zip.1 - onComplete()
elapsed time = 5,411 msec

50要素を、100msecごとに…5秒ちょっと。

ちょっとN+1っぽいものも試してみましょう。

    @Test
    public void relation() {
        sw(() -> {
            Flux<Integer> flux =
                    Flux.interval(Duration.ofMillis(100L))
                            .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed()))
                            .log("source-stream")
                            .map(Tuple2::getT2)
                            .flatMap(
                                    i ->
                                            Flux.interval(Duration.ofMillis(100L))
                                                    .zipWith(Flux.just(2))
                                                    .log("doubling")
                                                    .map(t -> t.getT2() * i)
                            );

            StepVerifier
                    .create(flux)
                    .expectNextCount(49)
                    .expectNext(100)
                    .verifyComplete();
        });
    }

ログ。

20:06:36.475 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:06:36.544 [main] INFO  source-stream - onSubscribe(FluxZip.ZipCoordinator)
20:06:36.548 [main] INFO  source-stream - request(256)
20:06:36.656 [parallel-1] INFO  source-stream - onNext([0,1])
20:06:36.660 [parallel-1] INFO  doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:06:36.660 [parallel-1] INFO  doubling - request(32)
20:06:36.754 [parallel-1] INFO  source-stream - onNext([1,2])
20:06:36.755 [parallel-1] INFO  doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:06:36.755 [parallel-1] INFO  doubling - request(32)
20:06:36.762 [parallel-2] INFO  doubling - onNext([0,2])
20:06:36.763 [parallel-2] INFO  doubling - onComplete()
20:06:36.763 [parallel-2] INFO  source-stream - request(1)
20:06:36.854 [parallel-1] INFO  source-stream - onNext([2,3])

〜省略〜

20:06:41.456 [parallel-1] INFO  doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:06:41.456 [parallel-1] INFO  doubling - request(32)
20:06:41.458 [parallel-1] INFO  doubling - onNext([0,2])
20:06:41.458 [parallel-1] INFO  doubling - onComplete()
20:06:41.458 [parallel-1] INFO  source-stream - request(1)
20:06:41.557 [parallel-1] INFO  source-stream - onNext([49,50])
20:06:41.559 [parallel-2] INFO  doubling - onNext([0,2])
20:06:41.560 [parallel-2] INFO  doubling - onComplete()
20:06:41.560 [parallel-1] INFO  doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:06:41.560 [parallel-2] INFO  source-stream - request(1)
20:06:41.560 [parallel-1] INFO  doubling - request(32)
20:06:41.562 [parallel-1] INFO  source-stream - onComplete()
20:06:41.670 [parallel-3] INFO  doubling - onNext([0,2])
20:06:41.671 [parallel-3] INFO  doubling - onComplete()
elapsed time = 5,483 msec

先ほどの結果と、ほぼ同じ実行時間ですね。なるほど。

ところで最初の例だと、ログに出力されていたスレッドはmainっぽくて

20:00:42.256 [main] INFO  reactor.Flux.Stream.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
20:00:42.262 [main] INFO  reactor.Flux.Stream.1 - | request(unbounded)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(1)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(2)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(3)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(4)
20:00:42.263 [main] INFO  reactor.Flux.Stream.1 - | onNext(5)

withIntervalをかませた途端、parallelになったような気がします。

20:05:17.596 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
20:05:17.602 [main] INFO  reactor.Flux.Zip.1 - request(unbounded)
20:05:17.711 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([0,1])
20:05:17.809 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([1,2])
20:05:17.910 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([2,3])
20:05:18.010 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([3,4])
20:05:18.109 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext([4,5])

さて、これはどうしたことでしょう。

Scheduler

ここで、実行されるスレッドが変わったのは、Schedulerが関わっているようです。

Schedulerって?

Schedulers

Reactorにおいて、どのように処理が実行されるかはこのSchedulerによって決定されるようです。

Schedulerには、Reactorから4種類のものが提供されています。

Schedulers (Reactor Core 3.1.2.RELEASE)

  • immediate … 現在のスレッドを使うScheduler
  • single … 単一のスレッドを使うScheduler
  • elastic … 必要に応じて、新しいWorkerを生成するScheduler。長時間アイドル状態(デフォルト60秒)になると、破棄される。ブロッキングなIO処理に適している
  • parallel … CPUコア数で固定された数のWorkerを使用するScheduler

それぞれ、Schedulersクラスのstaticメソッドとしてデフォルトのものが用意され、Schedulerインターフェースの実装が返ってきます。
Scheduler (Reactor Core 3.1.2.RELEASE)

各Schedulerの実装は、こちら。
https://github.com/reactor/reactor-core/tree/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler

single、elastic、parallelについては、必要に応じてnewXxxx(XxxxにはSchedulerの種類が入る)にてSchedulerを作成することもできますし、
j.u.c.Executorやj.u.c.ExecutorServiceからもSchedulerを作ることが可能です。

これらにて取得・作成したSchedulerを、Publisher(Mono/Flux)のpublishOnやsubscribeOnに設定することで、指定したScheduler上でタスクを動作させる
ことができるようになります。

https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Flux.html#publishOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Flux.html#subscribeOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Mono.html#publishOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Mono.html#subscribeOn-reactor.core.scheduler.Scheduler-

FluxやMonoの図を見ると、publishOn/subscribeOnのそれぞれで、どういう箇所に影響があるかわかりますね。

Flux#publishOn
f:id:Kazuhira:20180107202944p:image

Flux#subscribeOn
f:id:Kazuhira:20180107202941p:image

使ってみる

それでは、提供されているSchedulerでちょっと動作を切り替えてみましょう。

single。

    @Test
    public void relationOnSingle() {
        sw(() -> {
            Flux<Integer> flux =
                    Flux.interval(Duration.ofMillis(100L))
                            .publishOn(Schedulers.single())
                            .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed()))
                            .log("single-source-stream")
                            .map(Tuple2::getT2)
                            .flatMap(
                                    i ->
                                            Flux.interval(Duration.ofMillis(100L))
                                                    .publishOn(Schedulers.single())
                                                    .zipWith(Flux.just(2))
                                                    .log("single-doubling")
                                                    .map(t -> t.getT2() * i)
                            );

            StepVerifier
                    .create(flux)
                    .expectNextCount(49)
                    .expectNext(100)
                    .verifyComplete();
        });
    }

2箇所にFlux#publishOnでSchedulers#singleを指定しています。

ログ。

20:30:44.331 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:30:44.410 [main] INFO  single-source-stream - onSubscribe(FluxZip.ZipCoordinator)
20:30:44.414 [main] INFO  single-source-stream - request(256)
20:30:44.531 [single-1] INFO  single-source-stream - onNext([0,1])
20:30:44.536 [single-1] INFO  single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:30:44.538 [single-1] INFO  single-doubling - request(32)
20:30:44.639 [single-1] INFO  single-source-stream - onNext([1,2])
20:30:44.639 [single-1] INFO  single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:30:44.640 [single-1] INFO  single-doubling - request(32)
20:30:44.640 [single-1] INFO  single-doubling - onNext([0,2])
20:30:44.642 [single-1] INFO  single-doubling - onComplete()
20:30:44.642 [single-1] INFO  single-source-stream - request(1)
20:30:44.728 [single-1] INFO  single-source-stream - onNext([2,3])

〜省略〜

20:30:49.328 [single-1] INFO  single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:30:49.328 [single-1] INFO  single-doubling - request(32)
20:30:49.329 [single-1] INFO  single-doubling - onNext([0,2])
20:30:49.330 [single-1] INFO  single-doubling - onComplete()
20:30:49.330 [single-1] INFO  single-source-stream - request(1)
20:30:49.428 [single-1] INFO  single-source-stream - onNext([49,50])
20:30:49.428 [single-1] INFO  single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:30:49.429 [single-1] INFO  single-doubling - request(32)
20:30:49.429 [single-1] INFO  single-source-stream - onComplete()
20:30:49.433 [single-1] INFO  single-doubling - onNext([0,2])
20:30:49.433 [single-1] INFO  single-doubling - onComplete()
20:30:49.530 [single-1] INFO  single-doubling - onNext([0,2])
20:30:49.530 [single-1] INFO  single-doubling - onComplete()
elapsed time = 5,485 msec

確かに、singleなSchedulerが使われるようになったようです。

続いて、elastic。

    @Test
    public void relationOnElastic() {
        sw(() -> {
            Flux<Integer> flux =
                    Flux.interval(Duration.ofMillis(100L))
                            .publishOn(Schedulers.elastic())
                            .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed()))
                            .log("elastic-source-stream")
                            .map(Tuple2::getT2)
                            .flatMap(
                                    i ->
                                            Flux.interval(Duration.ofMillis(100L))
                                                    .publishOn(Schedulers.elastic())
                                                    .zipWith(Flux.just(2))
                                                    .log("elastic-doubling")
                                                    .map(t -> t.getT2() * i)
                            );

            StepVerifier
                    .create(flux)
                    .expectNextCount(49)
                    .expectNext(100)
                    .verifyComplete();
        });
    }

ログ。

20:32:06.197 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:32:06.260 [main] INFO  elastic-source-stream - onSubscribe(FluxZip.ZipCoordinator)
20:32:06.264 [main] INFO  elastic-source-stream - request(256)
20:32:06.375 [elastic-2] INFO  elastic-source-stream - onNext([0,1])
20:32:06.378 [elastic-2] INFO  elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:32:06.379 [elastic-2] INFO  elastic-doubling - request(32)
20:32:06.474 [elastic-2] INFO  elastic-source-stream - onNext([1,2])
20:32:06.474 [elastic-2] INFO  elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:32:06.475 [elastic-2] INFO  elastic-doubling - request(32)
20:32:06.484 [elastic-3] INFO  elastic-doubling - onNext([0,2])
20:32:06.485 [elastic-3] INFO  elastic-doubling - onComplete()
20:32:06.485 [elastic-3] INFO  elastic-source-stream - request(1)
20:32:06.575 [elastic-2] INFO  elastic-source-stream - onNext([2,3])

〜省略〜

20:32:11.175 [elastic-2] INFO  elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:32:11.175 [elastic-2] INFO  elastic-doubling - request(32)
20:32:11.177 [elastic-8] INFO  elastic-doubling - onNext([0,2])
20:32:11.177 [elastic-8] INFO  elastic-doubling - onComplete()
20:32:11.177 [elastic-8] INFO  elastic-source-stream - request(1)
20:32:11.275 [elastic-2] INFO  elastic-source-stream - onNext([49,50])
20:32:11.276 [elastic-2] INFO  elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator)
20:32:11.276 [elastic-2] INFO  elastic-doubling - request(32)
20:32:11.277 [elastic-4] INFO  elastic-doubling - onNext([0,2])
20:32:11.277 [elastic-4] INFO  elastic-doubling - onComplete()
20:32:11.281 [elastic-4] INFO  elastic-source-stream - request(1)
20:32:11.281 [elastic-2] INFO  elastic-source-stream - onComplete()
20:32:11.378 [elastic-7] INFO  elastic-doubling - onNext([0,2])
20:32:11.378 [elastic-7] INFO  elastic-doubling - onComplete()
elapsed time = 5,455 msec

こちらも、elasticなSchedulerが使われるようになったようです。

デフォルトで用意されているSchedulerについて

Schedulersに定義されている各種staticメソッドで、定義済みのSchedulerを取得することができますが、これらの定義はどのようになっているのでしょう?

例えば、elastic。ソースコードを見ると、こんな感じの定義になっています。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java#L122-L124

	public static Scheduler elastic() {
		return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER);
	}

parallelとsingleもほぼ同様です。

immeidateだけ、ちょっと違いますね。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java#L131-L133

	public static Scheduler immediate() {
		return ImmediateScheduler.instance();
	}

elastic、parallel、singleの定義済みのSchedulerは、呼び出し元に対して、同じSchedulerのインスタンスを返却するように実装されています。

定義は、こんな感じ。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java#L483-L502

	// Internals
	static final String ELASTIC  = "elastic"; // IO stuff
	static final String PARALLEL = "parallel"; //scale up common tasks
	static final String SINGLE   = "single"; //non blocking tasks
	static final String TIMER    = "timer"; //timed tasks

	// Cached schedulers in atomic references:
	static AtomicReference<CachedScheduler> CACHED_ELASTIC  = new AtomicReference<>();
	static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>();
	static AtomicReference<CachedScheduler> CACHED_SINGLE   = new AtomicReference<>();

	static final Supplier<Scheduler> ELASTIC_SUPPLIER =
			() -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true);

	static final Supplier<Scheduler> PARALLEL_SUPPLIER = () -> newParallel(PARALLEL,
			Runtime.getRuntime()
			       .availableProcessors(),
			true);

	static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true);

Supplierの定義を見ると、だいたいわかりそうな感じですね。いずれのSchedulerも、Schedulersが提供する各種newXxxxメソッドにて実装されており、
最初に作成した結果をAtomicReferenceで保持するという形になっています。

最初の引数は、スレッド名のprefixです。

elasticの場合は第2引数がデフォルトのTTL(60秒)、parallelの場合は第2引数が生成するWorkerの数ですが、ここでは利用可能なCPUコア数、ということに
なりますね。

いずれのSchedulerも、最後の引数がtrueになっているのはdaemonスレッドとして定義するためです。

Schedulersの各種newXxxxメソッドで定義したSchedulerについて、newXxxxメソッドにはいくつかオーバーロードされたバージョンがありますが、
引数が少ないもの(daemonのON/OFFが指定できないもの)については、非daemonとして定義されます。

自分でSchedulerを定義した場合は、ちゃんと管理に注意しましょう、ってことになるでしょうね。

Schedulerを停止する場合は、Scheduler#disposeを呼び出します。
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/scheduler/Scheduler.html#dispose--

まあ、あまり新しいSchedulerのインスタンスを作らない方向の方が…という感じですね。

publishOnやsubscribeOnで指定したSchedulerは、どう使われるか?

Fluxで見てみましょう。

publishOnの場合は、FluxPublishOn内でSchedulerからWorkerが生成され、タスクがWorker#scheduleで登録されることになります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L82-L83
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L282

FluxPublishOnは内部にQueueを持っており、このQueueから取得した値を使って処理を実行していくようですね。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L307

ちゃんと確認してはいませんが、FluxPublishOn内にこのQueueに対してofferしている箇所もあるようです。

subscribeOnの場合は、subscribe時にFluxSubscribeOn内で使われるようです。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxSubscribeOn.java#L129-L147

が、requestOnSeparateThreadがfalse、またはSubscriberを動作させたスレッドとカレントスレッドが同じ場合は、そのまま同じスレッドで
動作するようです。

なお、requestOnSeparateThreadはsubscribeOn時に明示的にfalseにしない限りは、デフォルトでtrueになります。

同じようなクラスが、Monoにもあったりします。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/MonoPublishOn.java
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/MonoSubscribeOn.java

こんな感じで、だいたい使われ方のイメージはついたかも?

Flux#interbal

ところで、Flux#intervalを使うと、突然parallelなSchedulerが使われましたね?あれはどうしてなんでしょう?

これは、Flux#intervalのコードを見るとなんとなくわかります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/Flux.java#L976-L978

	public static Flux<Long> interval(Duration period) {
		return interval(period, Schedulers.parallel());
	}

parallelなScheduler、思い切り使ってますね…。

Schedulers#parallelについては、意外とReactor内部でも使っているようです。
f:id:Kazuhira:20180107211420p:image

Flux#intervalを使った場合は、Schedulerの利用方法も変わってWorker#schedulePeriodicallyが使われるようになります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxInterval.java#L68

		try {
			w.schedulePeriodically(r, initialDelay, period, unit);
		}

mainスレッド?

そういえば、最初に戻ると1番単純な例は、mainスレッド(呼び出し元のスレッド)で動いていましたね。

これは?とか思うわけですが、最初の例だとSchedulerが絡むような(Reactor内部も含めて)コードが登場しないからでしょうね。で、特にSchedulerが
使われることなく突っ走る、と。

コールスタックを見たりしていると、FluxFlatMap$FlatMapMainとかで動いているようです。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java#L182

まとめ

ちょっと気になってざっとReactorのSchedulerまわりを見ていましたが、なんとなく雰囲気は見えたような気がします。

少しは理解の足しになったのかな…。

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/Kazuhira/20180107/1515327957