Hatena::ブログ(Diary)

CLOVER

2018-07-10

KeycloakのSpring Security Adapter+Spring Boot Adapterを使ってOpenID Connect

KeycloakのSpring Boot Adapterをこの前試してみたのですが、

Keycloak 4のSpring Boot 2 Adapterを試す - CLOVER

これとは別にSpring Security Adapterがあるようです。

Spring Security Adapter

Spring Boot Adapterを見ていた時に、Spring Security Adapterがあることには気付いていたのですが、「これは別物なのかな?」という印象を持ったので、
その時は手を付けずにいました。

今回は、そのSpring Security Adapterを試してみたいと思います。

Spring Security Adapter?

文字通り、Keycloakの提供するSpring Security向けのAdapterです。アプリケーションを、Spring Securityの機能を使いつつ、Keycloakと連携させることが
できます。

Spring Security Adapter

これを書いている人は、Spring Security素人ですが。

Spring Security Adapterを使うと、Keycloakと連動してのユーザー、ロールでアプリケーションを保護でき、
またSecurityContextHolder#getContext#getAuthentication#getPrincipalで、ログイン済みであればKeycloakPrincipalが取得できるようになります。

KeycloakPrincipalが取得できると、KeycloakSecurityContextが得られるようになるため、他のJava Adapterと変わらない(java.security.Principalが起点になる)
使い方になります。

また、Spring Boot Adapterと合わせて使うこともできます。

Spring Boot Integration

Spring Bootと合わせて使うことは多いと思いますので、今回は最初からSpring Boot Adapterと合わせて試してみることにします。

というわけで、Keycloak、Spring Security Adapter、Spring Boot Adapterを使って、OpenID Connectを試してみることを、今回のゴールとしましょう。

参考にしたのは、こちら。

Easily secure your Spring Boot applications with Keycloak - RHD Blog

なお、Keycloakが提供するRestTemplateの拡張もあるようですが、今回は対象外とします。

Client to Client Support

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)


$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-24-generic", arch: "amd64", family: "unix"

Keycloakのバージョンは、4.1.0.Finalとします。

準備

Maven依存関係は、こちら。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.3.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.keycloak.bom</groupId>
                <artifactId>keycloak-adapter-bom</artifactId>
                <version>4.1.0.Final</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-spring-security-adapter</artifactId>
        </dependency>
    </dependencies>

Spring Bootと合わせて使うので、まず「spring-boot-starter-security」を入れておきます。

KeycloakのJava Adapterは、「keycloak-spring-boot-starter」と「keycloak-spring-security-adapter」の両方を指定します。

なお、KeycloakのSpring Security Adapterを依存関係に加えても、Spring Security自体への依存関係は追加されないので(optional:true)、自分で明示的に
追加する必要があります。

Keycloakには、KeycloakとWildFlyの管理ユーザーを追加。それぞれ、「keycloak-admin」と「admin」。追加したら、再起動します。

$ bin/add-user-keycloak.sh -u keycloak-admin -p password
$ bin/add-user.sh -u admin -p password
$ bin/jboss-cli.sh -c -u=admin -p=password --command=reload

KeycloakWebSecurityConfigAdapter

今回の話の中心は、KeycloakWebSecurityConfigAdapterのサブクラスになるので、こちらを主題に。
src/main/java/org/littlewings/keycloak/spring/SecurityConfig.java

package org.littlewings.keycloak.spring;

import org.keycloak.adapters.KeycloakConfigResolver;
import org.keycloak.adapters.springboot.KeycloakSpringBootConfigResolver;
import org.keycloak.adapters.springsecurity.KeycloakConfiguration;
import org.keycloak.adapters.springsecurity.authentication.KeycloakAuthenticationProvider;
import org.keycloak.adapters.springsecurity.config.KeycloakWebSecurityConfigurerAdapter;
import org.keycloak.adapters.springsecurity.filter.KeycloakAuthenticatedActionsFilter;
import org.keycloak.adapters.springsecurity.filter.KeycloakAuthenticationProcessingFilter;
import org.keycloak.adapters.springsecurity.filter.KeycloakPreAuthActionsFilter;
import org.keycloak.adapters.springsecurity.filter.KeycloakSecurityContextRequestFilter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.core.authority.mapping.SimpleAuthorityMapper;
import org.springframework.security.core.session.SessionRegistryImpl;
import org.springframework.security.web.authentication.session.RegisterSessionAuthenticationStrategy;
import org.springframework.security.web.authentication.session.SessionAuthenticationStrategy;

@KeycloakConfiguration
public class SecurityConfig extends KeycloakWebSecurityConfigurerAdapter {
    @Bean
    public KeycloakConfigResolver KeycloakConfigResolver() {
        return new KeycloakSpringBootConfigResolver();
    }

    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
        KeycloakAuthenticationProvider keycloakAuthenticationProvider = keycloakAuthenticationProvider();

        // roleを「ROLE_」としなかった場合
        // keycloakAuthenticationProvider.setGrantedAuthoritiesMapper(new SimpleAuthorityMapper());

        auth.authenticationProvider(keycloakAuthenticationProvider);
    }

    @Bean
    @Override
    protected SessionAuthenticationStrategy sessionAuthenticationStrategy() {
        return new RegisterSessionAuthenticationStrategy(new SessionRegistryImpl());
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        super.configure(http);

        http
                .authorizeRequests()
                .antMatchers("/secure/**").hasRole("USERS")
                .anyRequest().permitAll();
    }

    @Bean
    public FilterRegistrationBean keycloakAuthenticationProcessingFilterRegistrationBean(
            KeycloakAuthenticationProcessingFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakPreAuthActionsFilterRegistrationBean(
            KeycloakPreAuthActionsFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakAuthenticatedActionsFilterBean(
            KeycloakAuthenticatedActionsFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakSecurityContextRequestFilterBean(
            KeycloakSecurityContextRequestFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }
}

まずは、Spring Security Adapterの情報から見ていきましょう。

KeycloakWebSecurityConfigurerAdapterクラスを継承しつつ、@KeycloakConfigurationアノテーションを付与したクラスを作成します。
KeycloakWebSecurityConfigurerAdapterクラスは、WebSecurityConfigurerの便利な実装クラスです。

@KeycloakConfiguration
public class SecurityConfig extends KeycloakWebSecurityConfigurerAdapter {

@KeycloakConfigurationアノテーションは、こんな定義です。
https://github.com/keycloak/keycloak/blob/4.1.0.Final/adapters/oidc/spring-security/src/main/java/org/keycloak/adapters/springsecurity/KeycloakConfiguration.java

@Retention(value = RUNTIME)
@Target(value = { TYPE })
@Configuration
@ComponentScan(basePackageClasses = KeycloakSecurityComponents.class)
@EnableWebSecurity
public @interface KeycloakConfiguration {
}

なので、こちらを使うと@EnableWebSecurityは個別に指定しなくてよい、と…。

次のように、各メソッドをオーバーライドします。

    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
        KeycloakAuthenticationProvider keycloakAuthenticationProvider = keycloakAuthenticationProvider();

        // roleを「ROLE_」としなかった場合
        // keycloakAuthenticationProvider.setGrantedAuthoritiesMapper(new SimpleAuthorityMapper());

        auth.authenticationProvider(keycloakAuthenticationProvider);
    }

    @Bean
    @Override
    protected SessionAuthenticationStrategy sessionAuthenticationStrategy() {
        return new RegisterSessionAuthenticationStrategy(new SessionRegistryImpl());
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        super.configure(http);

        http
                .authorizeRequests()
                .antMatchers("/secure/**").hasRole("USERS")
                .anyRequest().permitAll();
    }

Spring Securityはデフォルトでロール名に「ROLE_」prefixを要求するのですが、それをやめたい場合はSimpleAuthorityMapperを使うようです。

        // roleを「ROLE_」としなかった場合
        // keycloakAuthenticationProvider.setGrantedAuthoritiesMapper(new SimpleAuthorityMapper());||

アクセス制御については、KeycloakWebSecurityConfigurerAdapter#configureで基本的な設定は行われています。
https://github.com/keycloak/keycloak/blob/4.1.0.Final/adapters/oidc/spring-security/src/main/java/org/keycloak/adapters/springsecurity/config/KeycloakWebSecurityConfigurerAdapter.java#L117-L136

    @Override
    protected void configure(HttpSecurity http) throws Exception {

        http
                .csrf().requireCsrfProtectionMatcher(keycloakCsrfRequestMatcher())
                .and()
                .sessionManagement()
                .sessionAuthenticationStrategy(sessionAuthenticationStrategy())
                .and()
                .addFilterBefore(keycloakPreAuthActionsFilter(), LogoutFilter.class)
                .addFilterBefore(keycloakAuthenticationProcessingFilter(), BasicAuthenticationFilter.class)
                .addFilterBefore(keycloakAuthenticatedActionsFilter(), BasicAuthenticationFilter.class)
                .addFilterAfter(keycloakSecurityContextRequestFilter(), SecurityContextHolderAwareRequestFilter.class)
                .exceptionHandling().authenticationEntryPoint(authenticationEntryPoint())
                .and()
                .logout()
                .addLogoutHandler(keycloakLogoutHandler())
                .logoutUrl("/sso/logout").permitAll()
                .logoutSuccessUrl("/");
    }

ログアウトのURLは、「/sso/logout」らしいですね、デフォルトでは。

あとはサブクラス側で、追加を。

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        super.configure(http);

        http
                .authorizeRequests()
                .antMatchers("/secure/**").hasRole("USERS")
                .anyRequest().permitAll();
    }

「/secure/**」配下を、ログインしていてロール「ROLE_USERS」を保持している場合にアクセス可能に設定。

Spring Security Adapterの設定としては、とりあえずここまで。

続いて、Spring Boot Adapterと合わせて使うための話。

Spring Security AdapterとSpring Boot Adapterを合わせて使う場合は、keycloak.jsonではなくapplication.propertiesなどからKeycloakに関する情報を
取得するようにします。というわけで、KeycloakSpringBootConfigResolverを@Bean定義。

Using Spring Boot Configuration

    @Bean
    public KeycloakConfigResolver KeycloakConfigResolver() {
        return new KeycloakSpringBootConfigResolver();
    }

また、Spring Boot AdapterがServlet Filterを登録するのですが、KeycloakWebSecurityConfigurerAdapterでさらに同じServlet Filterを登録しようと
するため、これを回避するのにFilterRegistrationBeanを使ってServlet Filterをひとつ無効化します。

Avoid double Filter bean registration

    @Bean
    public FilterRegistrationBean keycloakAuthenticationProcessingFilterRegistrationBean(
            KeycloakAuthenticationProcessingFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakPreAuthActionsFilterRegistrationBean(
            KeycloakPreAuthActionsFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakAuthenticatedActionsFilterBean(
            KeycloakAuthenticatedActionsFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

    @Bean
    public FilterRegistrationBean keycloakSecurityContextRequestFilterBean(
            KeycloakSecurityContextRequestFilter filter) {
        FilterRegistrationBean registrationBean = new FilterRegistrationBean(filter);
        registrationBean.setEnabled(false);
        return registrationBean;
    }

どこが重複するかというのは、先ほどKeycloakWebSecurityConfigurerAdapter#configureで出てきました。

    @Override
    protected void configure(HttpSecurity http) throws Exception {

        http
                .csrf().requireCsrfProtectionMatcher(keycloakCsrfRequestMatcher())
                .and()
                .sessionManagement()
                .sessionAuthenticationStrategy(sessionAuthenticationStrategy())
                .and()
                .addFilterBefore(keycloakPreAuthActionsFilter(), LogoutFilter.class)
                .addFilterBefore(keycloakAuthenticationProcessingFilter(), BasicAuthenticationFilter.class)
                .addFilterBefore(keycloakAuthenticatedActionsFilter(), BasicAuthenticationFilter.class)
                .addFilterAfter(keycloakSecurityContextRequestFilter(), SecurityContextHolderAwareRequestFilter.class)
                .exceptionHandling().authenticationEntryPoint(authenticationEntryPoint())
                .and()
                .logout()
                .addLogoutHandler(keycloakLogoutHandler())
                .logoutUrl("/sso/logout").permitAll()
                .logoutSuccessUrl("/");
    }

なんとも言えない気もしますが、こういう構成だということで。

これで、Spring Security Adapterを使う設定ができました、と。

Controllerを書く

では、アプリケーションのメインとなるControllerを書いていきます。

未ログインでもアクセスできるものと、ログインが必要なものの2つを用意しましょう。といっても、ログインが必要かどうかはConfigurationの方で決めて
いるのですが。

まずは、未ログインでもアクセスできる(想定)のController。
src/main/java/org/littlewings/keycloak/spring/controller/PublicController.java

package org.littlewings.keycloak.spring.controller;

import java.security.Principal;
import java.util.LinkedHashMap;
import java.util.Map;

import org.keycloak.KeycloakPrincipal;
import org.keycloak.KeycloakSecurityContext;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("public")
public class PublicController {
    @GetMapping("hello")
    public String hello() {
        return "Hello Application!!";
    }

    @GetMapping("user")
    public Object user() {
        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

        Map<String, Object> results = new LinkedHashMap<>();

        if (authentication.getPrincipal() instanceof Principal) {
            Principal principal = (Principal) authentication.getPrincipal();

            if (principal != null) {
                results.put("principal-type", principal.getClass().getName());
                results.put("principal-name", principal.getName());
            }

            KeycloakSecurityContext context =
                    ((KeycloakPrincipal) principal).getKeycloakSecurityContext();
            if (context != null) {
                results.put("id-token", context.getIdToken());
                results.put("roles", context.getToken().getRealmAccess().getRoles());
            }
        } else {
            results.put("not-authenticated", authentication.getPrincipal());
        }

        return results;
    }
}

次に、ログインしてロールを持っているとアクセスできる(想定の)Controller。
src/main/java/org/littlewings/keycloak/spring/controller/SecureController.java

package org.littlewings.keycloak.spring.controller;

import java.security.Principal;
import java.util.LinkedHashMap;
import java.util.Map;

import org.keycloak.KeycloakPrincipal;
import org.keycloak.KeycloakSecurityContext;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("secure")
public class SecureController {
    @GetMapping("hello")
    public String hello() {
        return "Hello Secure Application!!";
    }

    @GetMapping("user")
    public Object user() {
        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = (Principal) authentication.getPrincipal();

        if (principal != null) {
            results.put("principal-type", principal.getClass().getName());
            results.put("principal-name", principal.getName());
        }

        KeycloakSecurityContext context =
                ((KeycloakPrincipal) principal).getKeycloakSecurityContext();
        if (context != null) {
            results.put("id-token", context.getIdToken());
            results.put("roles", context.getToken().getRealmAccess().getRoles());
        }

        return results;
    }
}

大雑把に言ってどちらもそう変わらないのですが、Keycloakを介してのログイン後であれば、Spring SecurityからKeycloakPrincipalを取得できるようになります。

        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = (Principal) authentication.getPrincipal();

        if (principal != null) {
            results.put("principal-type", principal.getClass().getName());
            results.put("principal-name", principal.getName());
        }

        KeycloakSecurityContext context =
                ((KeycloakPrincipal) principal).getKeycloakSecurityContext();
        if (context != null) {
            results.put("id-token", context.getIdToken());
            results.put("roles", context.getToken().getRealmAccess().getRoles());
        }

こうなると、あとは他のJava Adapterと同様にKeycloakSecurityContextを得ることができます。

Security Context

起動クラス

アプリケーションの起動クラスは、簡単に。
src/main/java/org/littlewings/keycloak/spring/App.java

package org.littlewings.keycloak.spring;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

これで、アプリケーションの準備は完了です。
※ログアウトは、POSTでCSRFトークン付きで「/sso/logout」にリクエストするView作るのが面倒でやめました…

設定ファイル

application.propertiesに、Keycloakの設定を書いていきます。

今回はKeycloakに対して、以下のセットアップ前提としています。

  • Realm名は「demo-api
  • 「sample-rest-api」というClientを追加済み
  • ClientのAccess Typeは「confidential」

src/main/resources/application.properties

keycloak.realm = demo-api
keycloak.auth-server-url = http://172.17.0.2:8080/auth
keycloak.resource = sample-rest-api
keycloak.credentials.secret = c97b8277-28a4-4ef5-b9a0-db43e7649670

spring.jackson.serialization.indent_output=true

RestControllerがJSONを返すので、わかりやすいようにインデントするようにしています。

Keycloakのアカウント

(面倒になったので)設定しているキャプチャは省略します。

「demo-api」Realmに対して、次の2つのロールを作成しました。

  • 「Roles」を選び、「Add Role」を選択
  • 「ROLE_USERS」と「ROLE_OTHERS」を作成

これを、次の状態のユーザーを作成してロールを紐付け

  • api-user」 … 「ROLE_USERS」を紐付け
  • 「test-user」 … 「ROLE_OTHERS」を紐付け

api-user」に対して、ロールを紐付けた画面はこちら。
f:id:Kazuhira:20180710233507p:image

確認

それでは、アプリケーションを起動して確認してみます。

パッケージング&起動。

$ mvn package

$ $ java -jar target/spring-security-adapter-example-0.0.1-SNAPSHOT.jar

ログイン前だと、「/public」配下には自由にアクセスできます。が、ユーザーの情報(KeycloakSecurityContext)を扱う処理については、「anonymousUser」で
アクセスしていることになっています。

$ curl localhost:8080/public/user
{
  "not-authenticated" : "anonymousUser"
}

Authentication#getPrincipalの結果が、Stringなんですねぇ。

        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

        if (authentication.getPrincipal() instanceof Principal) {
            // 省略
        } else {
            results.put("not-authenticated", authentication.getPrincipal());
        }

「/secure」配下だと、ログインを求められるので、Keycloakのログイン画面にリダイレクト後、アクセス可能なロールを持った「api-user」でログインして
戻ってくるとユーザーの情報が表示されます。
※これは、ユーザーの情報を表示する「http://localhost:8080/secure/user」にアクセスした例です
f:id:Kazuhira:20180710234309p:image

ログインしてしまえば、「/public/user」でもユーザーの情報を表示することができます。
f:id:Kazuhira:20180710234540p:image

1度セッションを切って、今度は「test-user」(アクセス許可されたロールを持っていないユーザー)でアクセスすると、Keycloakへのログイン後であっても
「/secure」配下のアクセスは拒否されます。
f:id:Kazuhira:20180710234821p:image

まとめ

KeycloakのAdapterのうち、Spring Security AdapterとSpring Boot Adapterを組み合わせて使ってみました。

Spring Securityはほとんど知らないのですが、漠然とUserDetialsとかのキーワードだけ覚えていて、そのあたりが出てくるのかなーと思っていましたが、
今回特に出ず…。

全体的に、Keycloakに寄せる統合の仕方をしているAdapterだなぁと思いました(KeycloakSecurityContextを意識するあたり)。

1度、Spring SecurityだけでKeycloakを使ってOpenID Connectを試すことにも、そのうちチャレンジしてみようかなと思います。

2018-07-07

Infinispan 9.3でWrite BehindなCacheWriterが、Fault Tolerantになったという話

Infinispan 9.3の新機能ネタです。

InfinspanにはCacheの背後のデータストアに対してデータを永続化したり読み出したりする、Persistence(CacheLoader/CacheWriter)の
仕組みがあるのですが、この中でWrite BehindなCacheWriterがInfinispan 9.3からFault Tolerantになりました、と。

Write-behind stores are now fault-tolerant by default.

https://blog.infinispan.org/2018/06/infinispan-930final-is-out.html

Write Behind?

Write Behindというのは、Cacheに書き込まれたデータを、データストアに非同期に反映する方法です。
対となるのはWrite Throughで、こちらは同期書き込みになります。

Write Behindでは、Cacheへの変更内容をキューに入れ、別スレッドでデータストアに非同期に反映されます。この時に
使われるのが、CacheWriterまたはAdvancedCacheWriterの実装です。

ちなみに、変更キューがいっぱいになった場合は、キューに受け入れ可能になるまでWrite Throughと同じになるそうな。

Persistence / Write-Behind (Asynchronous)

Fault Tolerant?

Infinispan 9.2まではWrite Behindに構成されたCacheWriterは、背後にあるデータストアの書き込みに失敗した場合は
既定回数(3回)リトライし、それでもダメだった場合は諦める=該当の変更はデータストアには反映されない、という
実装でした。

これがInfinispan 9.3では、改善されたという話です。

Add Fault-tolerance to write-behind stores

CacheWriterに対して、isAvailableというメソッドが追加されました。(これは、CacheLoaderにも追加されていますが)

CacheWriter (Infinispan JavaDoc All 9.3.0.Final API)

このisAvailableメソッドがfalseを返している間はデータストアへの反映を中止しキューに反映を積み込むだけとなり、
CacheWriter#isAvailableがtrueになったらデータストアへの反映を再開します。

今回は、この機能について見ていこうと思います。

環境

今回確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ sbt sbtVersion
[info] 1.1.6

使用するInfinispanは、9.3です。

準備

sbtでの依存関係は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.3.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

Infinispanは、Embedded Modeで利用します。また、確認はScalaTestを使ったテストコードで行います。

CacheStore(CacheWriter/AdvancedCacheWriter)を作成する

今回の確認で使うCacheStore(CacheWriter…というか、実際はAdvancedLoadWriteStoreですが…)は、今回はインメモリ(Map)にデータを持つように作成します。
src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStore.scala

package org.littlewings.infinispan.writebehind

import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.concurrent.{ConcurrentHashMap, Executor}

import org.infinispan.commons.marshall.StreamingMarshaller
import org.infinispan.marshall.core.{MarshalledEntry, MarshalledEntryFactory}
import org.infinispan.metadata.InternalMetadata
import org.infinispan.persistence.spi.{AdvancedCacheWriter, AdvancedLoadWriteStore, InitializationContext}
import org.infinispan.util.TimeService
import org.jboss.logging.Logger

import scala.collection.JavaConverters._

object InMemoryCacheStore {
  val COUNTER: AtomicInteger = new AtomicInteger(0)
  val AVAILABLE: AtomicBoolean = new AtomicBoolean(true)

  private val CURRENT_STORE: AtomicReference[InMemoryCacheStore[_, _]] = new AtomicReference[InMemoryCacheStore[_, _]]

  def currentStoreEntries[K, V]: Map[K, V] =
    CURRENT_STORE.get.underlyingStore.map { case (k, (v, _)) => (k.asInstanceOf[K], v.asInstanceOf[V]) }.toMap
}

class InMemoryCacheStore[K, V] extends AdvancedLoadWriteStore[K, V] {
  val logger: Logger = Logger.getLogger(getClass)

  var configuration: InMemoryCacheStoreConfiguration = _
  var marshaller: StreamingMarshaller = _
  var marshalledEntryFactory: MarshalledEntryFactory[K, V] = _
  var timeService: TimeService = _

  val underlyingStore: scala.collection.mutable.Map[K, (V, InternalMetadata)] =
    new ConcurrentHashMap[K, (V, InternalMetadata)]().asScala

  val failedKeyCounter: scala.collection.mutable.Map[K, Int] =
    new ConcurrentHashMap[K, Int]().asScala

  override def start(): Unit = {
    logger.infof("InMemoryCacheLoadWriteStore started")

    InMemoryCacheStore.CURRENT_STORE.set(this)
  }

  override def stop(): Unit = {
    logger.infof("InMemoryCacheLoadWriteStore stopped")
  }

  override def init(ctx: InitializationContext): Unit = {
    configuration = ctx.getConfiguration.asInstanceOf[InMemoryCacheStoreConfiguration]
    marshaller = ctx.getMarshaller
    marshalledEntryFactory = ctx.getMarshalledEntryFactory.asInstanceOf[MarshalledEntryFactory[K, V]]
    timeService = ctx.getTimeService

    logger.infof("InMemoryCacheLoadWriteStore initialized")
  }

  override def size(): Int = underlyingStore.size

  override def clear(): Unit = underlyingStore.clear()

  override def purge(threadPool: Executor, listener: AdvancedCacheWriter.PurgeListener[_ >: K]): Unit = {
    logger.infof("purge started")

    val now = timeService.wallClockTime()

    underlyingStore
      .foreach { case (k, (_, meta)) =>
        if (meta.isExpired(now)) {
          logger.infof("purge key = %s", k)
          listener.entryPurged(k)
        }
      }

    logger.infof("purge end")
  }

  override def isAvailable: Boolean = InMemoryCacheStore.AVAILABLE.get

  override def write(entry: MarshalledEntry[_ <: K, _ <: V]): Unit = {
    val key = entry.getKey
    val value = entry.getValue

    failedKeyCounter
      .get(key)
      .orElse(Option(0))
      .filter(_ < InMemoryCacheStore.COUNTER.get)
      .foreach { v =>
        failedKeyCounter.put(key, v + 1)
        logger.infof("Oops!! key = %s, failed-count = %d", key, v)
        throw new RuntimeException(s"Oops!! key = ${key} failed-count = ${v}")
      }

    failedKeyCounter.put(key, 0)

    logger.infof("write key = %s, value = %s", key, value)

    underlyingStore.put(key, (value, entry.getMetadata))
  }

  override def delete(key: scala.Any): Boolean = {
    logger.infof("delete key = %s", key)
    underlyingStore.remove(key.asInstanceOf[K]).isDefined
  }

  override def load(key: scala.Any): MarshalledEntry[K, V] = {
    val loaded = underlyingStore
      .get(key.asInstanceOf[K])
      .map { case (value, metadata) => marshalledEntryFactory.newMarshalledEntry(key, value, metadata) }
      .orNull

    logger.infof("loaded key = %s, value = %s", key, loaded)

    loaded
  }

  override def contains(key: scala.Any): Boolean = underlyingStore.contains(key.asInstanceOf[K])
}

こんな感じで、CacheStoreで読み書きするデータは、Mapで管理。

  val underlyingStore: scala.collection.mutable.Map[K, (V, InternalMetadata)] =
    new ConcurrentHashMap[K, (V, InternalMetadata)]().asScala

テストでの確認目的で、このCacheWriterは外部から操作できるようにしておきます。

CacheWriter#write時に任意の回数失敗させられたり、CacheWriter#isAvailableの値をコントロールしたり、現在のMapの値を取得できるようにしたり。

object InMemoryCacheStore {
  val COUNTER: AtomicInteger = new AtomicInteger(0)
  val AVAILABLE: AtomicBoolean = new AtomicBoolean(true)

  private val CURRENT_STORE: AtomicReference[InMemoryCacheStore[_, _]] = new AtomicReference[InMemoryCacheStore[_, _]]

  def currentStoreEntries[K, V]: Map[K, V] =
    CURRENT_STORE.get.underlyingStore.map { case (k, (v, _)) => (k.asInstanceOf[K], v.asInstanceOf[V]) }.toMap
}

任意の回数CacheWriter#writeを失敗させたり、CacheWriter#isAvailableの値をコントロールしているのは、このあたり。

  override def isAvailable: Boolean = InMemoryCacheStore.AVAILABLE.get

  override def write(entry: MarshalledEntry[_ <: K, _ <: V]): Unit = {
    val key = entry.getKey
    val value = entry.getValue

    failedKeyCounter
      .get(key)
      .orElse(Option(0))
      .filter(_ < InMemoryCacheStore.COUNTER.get)
      .foreach { v =>
        failedKeyCounter.put(key, v + 1)
        logger.infof("Oops!! key = %s, failed-count = %d", key, v)
        throw new RuntimeException(s"Oops!! key = ${key} failed-count = ${v}")
      }

    failedKeyCounter.put(key, 0)

    logger.infof("write key = %s, value = %s", key, value)

    underlyingStore.put(key, (value, entry.getMetadata))
  }

お行儀良くないですが、staticにしてテストコード側からコントロールします。

あとは、このCacheStoreを作成するためのConfigurationBuilderおよびConfigurationを作成。

// src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStoreConfigurationBuilder.scala
package org.littlewings.infinispan.writebehind

import org.infinispan.configuration.cache.{AbstractStoreConfiguration, AbstractStoreConfigurationBuilder, PersistenceConfigurationBuilder}

class InMemoryCacheStoreConfigurationBuilder(builder: PersistenceConfigurationBuilder)
  extends AbstractStoreConfigurationBuilder[InMemoryCacheStoreConfiguration, InMemoryCacheStoreConfigurationBuilder](builder, AbstractStoreConfiguration.attributeDefinitionSet) {
  override def self(): InMemoryCacheStoreConfigurationBuilder = this

  override def create(): InMemoryCacheStoreConfiguration = new InMemoryCacheStoreConfiguration(attributes.protect, async.create, singletonStore.create)
}

// src/main/scala/org/littlewings/infinispan/writebehind/InMemoryCacheStoreConfiguration.scala
package org.littlewings.infinispan.writebehind

import org.infinispan.commons.configuration.attributes.AttributeSet
import org.infinispan.commons.configuration.{BuiltBy, ConfigurationFor}
import org.infinispan.configuration.cache.{AbstractStoreConfiguration, AsyncStoreConfiguration, SingletonStoreConfiguration}

@BuiltBy(classOf[InMemoryCacheStoreConfigurationBuilder])
@ConfigurationFor(classOf[InMemoryCacheStore[_, _]])
class InMemoryCacheStoreConfiguration(
                                       attributes: AttributeSet,
                                       async: AsyncStoreConfiguration,
                                       singletonStore: SingletonStoreConfiguration
                                     ) extends AbstractStoreConfiguration(attributes, async, singletonStore) {
}

これで、CacheStore(CacheWriter)側の準備はおしまいです。

テストコードの雛形

では、作成したCacheStoreを使って確認する側の、テストコードへ移ります。

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/writebehind/FaultTolerantCacheStoreSpec.scala

package org.littlewings.infinispan.writebehind

import java.util.concurrent.TimeUnit

import org.infinispan.Cache
import org.infinispan.configuration.cache.{AsyncStoreConfigurationBuilder, ConfigurationBuilder}
import org.infinispan.manager.DefaultCacheManager
import org.jboss.logging.Logger
import org.scalatest.{FunSuite, Matchers}

class FaultTolerantCacheStoreSpec extends FunSuite with Matchers {
  val logger: Logger = Logger.getLogger(getClass)

  // ここに、テストを書く!!

  protected def withCache[K, V](cacheName: String, persistenceBuilder: ConfigurationBuilder => AsyncStoreConfigurationBuilder[InMemoryCacheStoreConfigurationBuilder])(fun: Cache[K, V] => Unit): Unit = {
    val configuration = persistenceBuilder(new ConfigurationBuilder).build()

    val manager = new DefaultCacheManager
    manager.defineConfiguration(cacheName, configuration)

    try {
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      manager.stop()
    }
  }
}

Cacheを簡易的に扱えるメソッドを付けていますが、テスト側でCacheStoreの設定ができるようにはしています。

とりあえず、Write BehindなCacheStoreを使ってみる

それでは最初に、特にFault Tolerantとか気にせずに、ふつうにWrite BehindなCacheStoreを使ってみます。

  test("write-behind store, simply") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(0)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )
    }
  }

API上では、async#enableとすると、Write BehindなCacheStoreになります。

        configuration
          .persistence
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled

XMLで書く場合は、write-behindタグになるようですけれどね。

<write-behind modification-queue-size="123" thread-pool-size="23" />

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#write_behind_asynchronous

CacheStoreへの書き込み失敗数は0で

      InMemoryCacheStore.COUNTER.set(0)

非同期に書き込むことになるので、一応少しsleepしてから、CacheStoreに書き込まれた内容を見るようにしています。

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )

Write Behindに関係する設定

Write Behindに関する設定はいくつかあって、まずはWrite Throughの時にも使われる設定から。

XMLで設定する場合はpersistenceタグの属性として設定するもので、以下に記載があります。

Persistence / Configuration


項目名意味Write Behind時の効果デフォルト値
connection-attemptsCacheWriter/CacheLoader開始時に失敗した場合の試行回数非同期書き込み時に失敗した場合のリトライ回数10
availability-intervalPersistenceManager(の背後にあるCacheWriter/CahceLoader)が使えるかどうか、定期的に確認する間隔リトライ時のインターバル1000

その他は、割愛します。

また、Write Behindのみの設定で、ポイントとなるのはこちら。


項目名意味デフォルト値
modification-queue-size非同期書き込みキューの最大エントリ数。これがいっぱいになると、Write Throughと同じ状態になり、次のエントリが受け入れられるようになるまで待機する1024
thread-pool-sizeCacheStoreに変更を適用する際のスレッドプールサイズ1
fail-silentlyfalse

fail-silentlyだけ長いので、ちょっと別に。

fail-silentlyはデフォルトで無効(false)で、Write Behindによる非同期書き込み時に失敗した場合、以下の条件を満たしていると変更を保留、待機します。

  • リトライの試行回数がconnection-attempts回に到達していない
  • CacheWriter#isAvailableがfalseを返す

この状態だと、CacheWriterを停止させ、CacheStoreが回復する(CacheWriter#isAvailableがtrueを返す)まで、非同期にCacheStoreに変更を反映するスレッドを
停止させます。

これを有効(true)にすると、CacheStoreへの書き込みに失敗する状態で、connection-attemptsに設定した回数のリトライを超えると、その変更は失われてしまいます。

このあたりの説明は、こちらのConfiguration Schemaを見るとよいでしょう。

Configuration Schema / urn:infinispan:config:9.3

書き込みを全部失敗させてみる

続いては、書き込みを全部失敗させてみます。

  test("write-behind store, write failed") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(2) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(3)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(6L) // waiting, all retry failed...

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)
    }
  }

リトライ数は2回で、ポーリング間隔は1秒(デフォルト)に対して、

         .connectionAttempts(2) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

書き込み失敗数は3。

      InMemoryCacheStore.COUNTER.set(3)

当然のことながら、すべてのCacheStoreへの書き込みは失敗し、

7 07, 2018 5:11:00 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:11:01 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:11:02 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!
7 07, 2018 5:11:02 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:11:03 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:11:04 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!
7 07, 2018 5:11:04 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 0
7 07, 2018 5:11:05 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 1
7 07, 2018 5:11:06 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 2 retries!

CacheStoreへはなにも書き込まれません。

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)

リトライの間隔が1秒なのは、availability-intervalが1,000ミリ秒(1秒)だからですね。

7 07, 2018 5:11:00 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:11:01 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1

このパターンで失敗した書き込みは、そのままロストしてしまいます。CacheStoreへは、反映されることはありません。

これ、けっこうポイントだったりします。それはまた後で。

失敗するCacheStoreへの書き込みを、後で再開させる

では、失敗するCacheStoreへの書き込みを、少し後で再開させるようにしてみましょう。

サンプルコードは、こちら。

  test("write-behind store, fault-tolerant, simply") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(2)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(1L)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      TimeUnit.SECONDS.sleep(3L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      TimeUnit.SECONDS.sleep(5L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )
    }
  }

リトライ回数が2、インターバル1秒に対して

          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

失敗数が2回です。

      InMemoryCacheStore.COUNTER.set(2)

また、途中でCacheWriter#isAvailableがfalseを返すように変更し

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

途中でtrueに返すようにしています。

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

最終的には、全部の書き込みがCacheStoreへ反映されています。

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1", "key2" -> "value2", "key3" -> "value3"
      )

これ、どういう挙動になるかというと

7 07, 2018 5:18:50 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = false
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:18:54 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = true
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:18:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:18:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:18:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 0
7 07, 2018 5:18:58 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key3, failed-count = 1
7 07, 2018 5:18:59 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3

CacheWriter#isAvailableがfalseを返すようにすると、しばらく書き込みの失敗が止まります。

7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = false
7 07, 2018 5:18:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:18:54 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$9
INFO: set store availability = true

失敗する回数自体は途中で変えていないので、CacheWriter#isAvailableがtrueを返すようにすると、また失敗し続けますが…。

INFO: set store availability = true
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:18:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:18:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1

これ、どういうことかというと、InfinispanのPersistenceの仕組みの中で、CacheWriter#isAvailableがfalseを返すようになると(CacheLoader#isAvailableも
そうですが)、そのCacheStoreへのアクセスを停止するようになっているからです。

CacheWriter#isAvailableがtrueになると、書き込みを再開します。

で、今回の例では、設定した数だけ失敗したあとにCacheStoreへの書き込みが完了します、と。

このあたりは、また後で見てみましょう。

CacheStoreが停止中に、同じキーに対して書き込みを行ったら?

CacheWriter#isAvailableがfalseとなるとCacheStoreへの書き込みを停止し、trueになると再開することがわかりました。

では、CacheWriter#isAvailableがfalseの間に、同じキーに対して書き込みをしたらどうなるでしょう?

  test("write-behind store, fault-tolerant, duplicate") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(5) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(4)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      cache.put("key1", "value1-1")
      cache.put("key2", "value2-2")
      cache.put("key3", "value3-3")

      TimeUnit.SECONDS.sleep(1L)


      TimeUnit.SECONDS.sleep(1L)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      TimeUnit.SECONDS.sleep(1L)

      InMemoryCacheStore.COUNTER.set(0)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      TimeUnit.SECONDS.sleep(6L)

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1-1", "key2" -> "value2-2", "key3" -> "value3-3"
      )
    }
  }

意外とすんなり反映されました?

      InMemoryCacheStore.currentStoreEntries[String, String] should contain only(
        "key1" -> "value1-1", "key2" -> "value2-2", "key3" -> "value3-3"
      )

ログを見ても、それっぽく並んでいます。

7 07, 2018 5:31:51 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:31:52 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:31:53 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 2
7 07, 2018 5:31:54 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 3
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$15
INFO: set store availability = false
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1
7 07, 2018 5:31:55 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$15
INFO: set store availability = true
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1-1
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3-3
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2-2

順番は崩れないのかな?

でも、よくよく見ると「key3 / value3」の組み合わせの書き込みがなくなったりしていますね。

7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2
7 07, 2018 5:31:56 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key1, value = value1-1
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key3, value = value3-3
7 07, 2018 5:31:57 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore write
INFO: write key = key2, value = value2-2

これはどこに行ったのでしょう?

fail-silentlyをtrueにしてみる

最後に、fail-silentlyをtrueにしてみましょう。

  test("write-behind store, fault-tolerant, disabled") {
    withCache[String, String](
      "write-behind",
      configuration =>
        configuration
          .persistence
          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)
          .addStore(classOf[InMemoryCacheStoreConfigurationBuilder])
          .async.enable // write-behind enabled
          .failSilently(true)  // default false
    ) { cache =>
      InMemoryCacheStore.COUNTER.set(3)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)
    }
  }

fail-silentlyをfalseにして

          .failSilently(true)  // default false

あとはシンプルに。書き込み失敗数は3。最初に、CacheWriter#isAvailableがfalseになるようにしています。

      InMemoryCacheStore.COUNTER.set(3)

      logger.infof("set store availability = false")
      InMemoryCacheStore.AVAILABLE.set(false)

      cache.put("key1", "value1")
      cache.put("key2", "value2")
      cache.put("key3", "value3")

      TimeUnit.SECONDS.sleep(2L)

      logger.infof("set store availability = true")
      InMemoryCacheStore.AVAILABLE.set(true)

      InMemoryCacheStore.currentStoreEntries[String, String] should be(empty)

リトライ数は3です。つまり、書き込みはすべて失敗するシナリオなのですが。

          .connectionAttempts(3) // retry-count
          .availabilityInterval(1000) // store-state polling-interval (default)

これ、どういう挙動になるかというと、CacheWriter#isAvailableがfalseを返すように設定しているにも関わらず、CacheStoreへ書き込もうとします。

7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 2
7 07, 2018 5:33:26 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 3 retries!
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 1
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key2, failed-count = 2
7 07, 2018 5:33:26 午後 org.infinispan.persistence.async.AsyncCacheWriter$AsyncStoreProcessor retryWork
WARN: ISPN000053: Unable to process some async modifications after 3 retries!
7 07, 2018 5:33:28 午後 org.littlewings.infinispan.writebehind.FaultTolerantCacheStoreSpec $anonfun$new$18
INFO: set store availability = true

あと、書き込みが失敗しても特にインターバルなくリトライしに行っています。

7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 0
7 07, 2018 5:33:26 午後 org.littlewings.infinispan.writebehind.InMemoryCacheStore $anonfun$write$3
INFO: Oops!! key = key1, failed-count = 1

ある意味で、ストレートな挙動ですね。

これで、だいたいWrite BehindなCache StoreのFault Tolerantに関する設定は見れたのかなと思います。

もうちょっと中身を

それでは、せっかくなのでもうちょっと中身の方を。

CacheStoreをWrite Behindに構成すると、CacheWriterがAsyncCacheWriter(AdvancedCacheWriterの場合はAdvancedAsyncCacheWriter)にラップされます。
これは、PersistenceManagerImplにて行われます。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L824-L827

AdvancedAsyncCacheWriterは、AsyncCacheWriterのサブクラスなので、以降は大筋のAsyncCacheWriterにフォーカスして書いていきます。

CacheStoreへの反映を制御しているのは、AsyncStoreProcessorというクラスになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L407

書き込みのタスク実行時に、リトライ回数としてconection-attemptsが使われます。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L423

で、CacheStoreへの書き込みが失敗する(=例外がスローされる)場合は、connection-attempts回数分だけリトライを繰り返してそれでもダメな場合は
諦める、という流れになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L437-L481

書き込みを指示している箇所は、こちら。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L463

失敗した時は、connection-attemptsで指定されたリトライ回数に達していない場合はavailability-intervalで指定したミリ秒分だけ待機します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L473

ここで、定期的に確認しているCacheWriter#isAvailableがfalseになっていることを確認できた場合は、スレッドが待機状態になります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L453

これには、Lock#newConditionから得られるConditionを使用しています。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L85

また、AsyncStoreProcessorの実行指示をするのはAsyncStoreCoordinatorというクラスなのですが、こちらも定期的に確認しているCacheWriter#isAvailableがfalseに
なっている場合は、スレッドが待機状態になります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L305

ちなみに、AsyncStoreCoordinatorは1スレッドで
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L128-L129

AsyncStoreProcessorは、write-behind / thread-pool-sizeで指定された値(デフォルト1)のスレッド数で動作します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L116-L119

ここで、CacheWriter#isAvailableを確認する処理はAsyncCacheWriterに書かれているのですが
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L165-L194

これを定期的に確認しているのはPersistenceManagerImplとなります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L156-L157

この確認間隔も、availability-intervalによって決まります。

で、先程CacheWriter#isAvailableがfalseを返した場合、AsyncStoreCoordinatorやAsyncStoreProcessorを動かしているスレッドが待機状態に入るという
ことを書きましたが、これを起こすのがこちらの処理。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L185

CacheWriter#isAvailableがfalseからtrueに切り替わったことが確認できたら、Condition#signalAllでスレッドを起こすという仕組みです。

なので、このFault Tolerantの仕組みは

  • CacheWriterの書き込み処理(CacheStoreへの書き込み処理)は、失敗しても規定回数(connection-attempts)リトライされる
    • ここで、繰り返しまではavailability-intervalミリ秒の待機がある
  • このリトライが、規定回数終わっても失敗したままだと、更新内容はCacheStoreへは反映されなくなる
  • ここで、CacheWriter#isAvailableでCacheStoreへの書き込み可能な状態をコントロールすることで、CacheStoreが回復するまで更新処理を待機させる

ということで、CacheStoreへの書き込みに失敗する場合は、リトライ回数に到達するまでにCacheWriter#isAvailableが正しく制御されて更新を停止し、
回復したらCacheWriter#isAvailableにもそれが反映されるということが行われた前提で成立しています。

例えば、JdbcCacheStoreはConnection#isValidの結果を、CacheWriter#isAvailableとして利用しています。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/persistence/jdbc/src/main/java/org/infinispan/persistence/jdbc/stringbased/JdbcStringBasedStore.java#L175

で、こうなると変更を溜めていくキューの仕組みが気になるところですが、ここで使われているキューは、実際にはロックの許容数です。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L254-L263

write-behind / modification-queue-sizeというのは、ロック取得可能な数を指定することのようですね。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L112

というわけで、modification-queue-sizeはNode単位の設定ということが言えるでしょう。

この変更内容はStateというものに入れられるのですが、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L260

Stateの内部では変更がModificationの積み上げとして表現されていますが、その管理単位はキー単位となります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/State.java#L90-L101

このため、短時間で同じキーに対して変更を入れると、最後だけが残ったりするのでしょうね。

また、Stateはチェーン構成になっていて、それをたどる処理がところどころにあったりします。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L320-L325
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L428-L430

最後にfail-silentlyですが、AsyncCacheWriterでそこそこ登場しており、これがtrueだとCacheWriter#isAvailableの変更結果を気にしないとか
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L170-L171

ここまでに説明したConditionでのスレッドの待機や、リトライ時のavailability-intervalでの待機時間を無視するといった感じになります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L299
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L442-L455
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L469-L478

代わりに、CacheWriter#isAvailableが呼ばれなくはなりますね。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/async/AsyncCacheWriter.java#L171

あと、Write Behindとは関係なくオマケ的になのですが、CacheStoreの起動時に失敗してしまうような場合は、connection-attempts回数分だけリトライし、
その時の待機時間はconnection-intervalで決まるみたいですよ。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java#L870-L893

ここは、availability-intervalじゃないんですねぇ、と。

まとめ

今回は、Infinispan 9.3でFault TolerantになったというWrite BehindなCache Writerを見てみました。

あんまりWrite Behindの裏側って見てこなかったので、いろいろと参考になりましたねぇ。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-write-behind-store-fault-tolerant

2018-06-30

Infinispan 9.3で追加された、Hot Rod Transactionを試す

先日、Infinispan 9.3.0.Finalがリリースされました。

Infinispan 9.3.0.Final is out!

これからちょっとずつ見ていこうと思いますが、まず最初に気になったのはこちら。

Transaction support Hot Rod. The java Hot Rod client can participate in Java transactions via Synchronization or XA enlistment. Note that recovery isn't supported yet.

https://blog.infinispan.org/2018/06/infinispan-930final-is-out.html

Hot Rodで、トランザクションがサポートされたようです。リカバリこそできないものの、JTAのSynchronization、もしくは
XAリソースとしてトランザクションに参加可能な模様。

ドキュメントは、こちらです。

Hot Rod Transaction

というわけで、今回はこちらを試してみます。

Hot Rod Transaction?

中身に入っていく前に、ドキュメントからもうちょっとHot Rod Transactionについて掘り下げてみましょう。

Hot Rod Transaction

Hot Rodの(Java)Clientを、JTAトランザクションに参加させることができる機能です。

書き込み準備時にロックを取る、とありますが…?

The transactions are optimistic in a way that the write locks are acquired at prepare time.

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#hot_rod_transaction

Cacheは、Server側とClient側それぞれでトランザクションの設定を行う必要があります。

まずは、Server側。Hot RodのServer側ですが、これは通常Infinispan Serverとして起動しているでしょう。

Hot Rod Transactionで使うCacheは、以下の条件を満たす形で構成されている必要があります。

  • Isolation LevelがREPEATABLE_READ
  • Locking ModeがPESSIMISTIC(厳密には、OPTIMISTIC、またはTotal Order based commit protocolの利用を許容しない)

上記を満たす、トランザクショナルなCacheである必要があります、と。

トランザクションのモードは、通常どおりNON_XA、NON_DURABLE_XA、FULL_XAから選ぶことができますが、パフォーマンス上の理由から、NON_XAまたはNON_DURABLE_XAを
推奨しているようです。

Also, as transaction mode, it is recommended to use NON_XA or NON_DURABLE_XA. FULL_XA imposes a performance penalty and it won’t be used by your Hot Rod transaction. Hot Rod transaction will have it owns recovery mechanism.

http://infinispan.org/docs/9.3.x/user_guide/user_guide.html#server_configuration

そのうち、Hot Rod側で独自のリカバリメカニズムを持つのでしょうか?

Client側は、TransactionManagerとTransactionModeを決める必要があります。

TransactionManagerは、TransactionManagerLookupによりどのTransactionManagerを使用するかを決めます。

TransactionManagerLookupは、Hot Rod Clientでは以下の2つが用意されています。

  • GenericTransactionManagerLookup … Java EEサーバーが提供するTransactionManagerを使用する、TransactionManagerLookupの実装。利用できるTransactionManagerがない場合は、RemoteTransactionManagerを使用する
  • RemoteTransactionManagerLookup … RemoteTransactionManagerを使用する、TransactionManagerLookupの実装。RemoteTransactionManagerは、インメモリな実装で、並行性やリカバリに制限がある

TransactionModeについては、Server側とは別に、Client側でも指定する必要があります。RemoteCacheが、どのように振る舞うかです。

  • NONE … デフォルト。RemoteCacheは、TransactionManagerと対話しない(トランザクションに参加しない)
  • NON_XA … RemoteCacheをJTAのSynchronizationとして使う
  • NON_DURABLE_XA … RemoteCacheをリカバリの無効化されたXAResourceとして扱う
  • FULL_XA … RemoteCacheをリカバリを有効化されたXAResourceとして扱うが、現在未サポート

といった感じです。

では、そろそろ使うコードに入っていきましょう。

環境

確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ sbt sbtVersion
[info] Loading project definition from /path/to/remote-transaction/project
[info] Loading settings from build.sbt ...
[info] Set current project to remote-transaction (in build file:/path/to/remote-transaction/)
[info] 1.1.6

Infinispan Serverは、9.3.0.Finalを使用します。

準備

sbtでの依存関係は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % "9.3.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

Hot Rod Transactionを使うには、通常のHot Rod Clientを使う時と同じように「infinispan-client-hotrod」を使えばOKです。

Infinispan Serverを起動。

$ unzip infinispan-server-9.3.0.Final.zip
$ cd infinispan-server-9.3.0.Final
$ bin/standalone.sh -c clustered.xml

Infinispan Serverは1 Nodeとしますが、クラスタを構成可能なモードで起動しておきます。

Cacheの作成

Infinispan ServerにCacheを作成するために、まずはCLIで操作するための管理ユーザーを作成します。

$ bin/add-user.sh -u ispn-admin -p password
$ bin/ispn-cli.sh -c -u=ispn-admin -p=password --command=reload

接続。

$ bin/ispn-cli.sh -c -u=ispn-admin -p=password 
[standalone@localhost:9990 /] 

Cacheを作成。

## NON_XAなCacheを作成
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration:add(start=EAGER,mode=SYNC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_XA,locking=PESSIMISTIC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}


## NON_DURABLE_XAなCacheを作成
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonDurableXaCacheConfiguration:add(start=EAGER,mode=SYNC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonDurableXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_DURABLE_XA,locking=PESSIMISTIC)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}


## NON_XAなCacheを2つ追加
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache1:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonXaCache2:add(configuration=nonXaCacheConfiguration)
{"outcome" => "success"}

## NON_DURABLE_XAなCacheを2つ追加
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache1:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=nonDurableXaCache2:add(configuration=nonDurableXaCacheConfiguration)
{"outcome" => "success"}

リロード。

[standalone@localhost:9990 /] reload

では、これを元にテストコードで確認していきます。

テストコードの雛形

テストコードの雛形は、こちら。
src/test/scala/org/littlewings/infinispan/transaction/HotRodTransactionSpec.scala

package org.littlewings.infinispan.transaction

import java.util.concurrent.{CompletableFuture, Executors}

import org.infinispan.client.hotrod.configuration.{ConfigurationBuilder, TransactionMode}
import org.infinispan.client.hotrod.exceptions.HotRodClientException
import org.infinispan.client.hotrod.transaction.manager.RemoteTransactionManager
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager}
import org.scalatest.{FunSuite, Matchers}

class HotRodTransactionSpec extends FunSuite with Matchers {
  // ここに、テストを書く!!
}

RemoteCacheManagerおよびRemoteCacheの作成

いつもなら、テストコードの雛形の中にRemoteCacheManagerやRemoteCache取得のコードは含めてしまうのですが、今回はここもポイントになるので、個別に。

まず、Hot Rod Clientでトランザクションを扱うためには、RemoteCacheManager作成時にトランザクションを有効にしておく必要があります。

  protected def withCacheManager(transactionMode: TransactionMode = TransactionMode.NONE)(fun: RemoteCacheManager => Unit): Unit = {
    val manager =
      new RemoteCacheManager(
        new ConfigurationBuilder()
          .addServer()
          .host("172.17.0.2")
          .port(11222)
          .transaction()
          .transactionMode(transactionMode)
          .build()
      )

    try {
      fun(manager)
    } finally {
      manager.stop()
    }
  }

ConfigurationBuilderでの設定時に、transactionalメソッドでトランザクションの設定に切り替え、TransactionModeを指定します。RemoteCacheではなく、
RemoteCacheManagerの単位で全体の設定を行うことになります。

今回は使いませんが、TransactionManagerLookupの設定を行うこともできます。

TransactionConfigurationBuilder (Infinispan JavaDoc All 9.3.1.Final API)

hotrod-client.propertiesで指定する場合は、こちらのプロパティを指定する模様。

https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/ConfigurationProperties.java#L71-L72

RemoteCacheManager構築時に指定したトランザクションの設定はデフォルトの設定で、RemoteCacheを取得する時にその設定を上書きすることもできます。

今回は、このメソッドを使ってRemoteCacheManagerを使います。

さらに簡易的に、RemoteCacheを使う場合は、こちらのメソッドで。

  protected def withCache[K, V](cacheName: String, transactionMode: TransactionMode = TransactionMode.NONE)(fun: RemoteCache[K, V] => Unit): Unit =
    withCacheManager(transactionMode) { manager =>
      val cache = manager.getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    }

では、テストを書いてみましょう。

非トランザクショナルなCacheを使う

まずは、非トランザクショナルなRemoteCacheを使ってみましょう。

  test("non-transactional-cache / mode none") {
    withCache[String, String]("default", TransactionMode.NONE) { cache =>
      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      val tm = cache.getTransactionManager
      tm should be(null)
    }
  }

ふつうに使えるのはそうなのですが、TransactionModeがNONEの場合(デフォルト)は、RemoteCache#getTransactionManagerがnullを返します。

ここで、「default」というCacheはInfinispan Serverでデフォルトで用意されている次の定義です。

<distributed-cache name="default"/>

TransactionManagerは、RemoteCacheから取得するんですね。

続いて、RemoteCacheManagerを、TransactionModeをNON_DURABLE_XAで構成してTransactionManagerを使ってみます。

  test("non-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("default", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      val tm = cache.getTransactionManager

      tm should not be (null)

      tm.begin()

      val thrown = the[HotRodClientException] thrownBy cache.put("key2", "value2")
      thrown.getMessage should be("ISPN004084: Cache default doesn't support transactions. Please check the documentation how to configure it properly.")

      tm.commit()
    }
  }

Server側のCacheは非トランザクショナル、RemoteCacheはNON_DURABLE_XAという状態です。

すると、この「default」Cacheは非トランザクショナルなCacheなため操作が実行できず、例外がスローされます。

      val thrown = the[HotRodClientException] thrownBy cache.put("key2", "value2")
      thrown.getMessage should be("ISPN004084: Cache default doesn't support transactions. Please check the documentation how to configure it properly.")

というわけで、Server側で定義されているCacheは、トランザクションに参加するためにはトランザクションを有効にしたCacheである必要があることが確認できました。

以降は、トランザクショナルに構成されたCacheを使っていきます。

トランザクショナルなCacheを使う

続いて、Server側でトランザクショナルに構成されたCacheを使っていきます。

Server側のNON_XAなCacheと、RemoteCacheもNON_XAで。

  test("non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key1", "value1")
      cache.get("key1") should be("value1")

      tm.commit()

      cache.get("key1") should be("value1")


      tm.begin()

      cache.put("key2", "value2")
      cache.get("key2") should be("value2")

      tm.rollback()

      cache.get("key2") should be(null)
    }
  }

すると、TransactionManager#begin、TransactionManager#commit、TransactionManager#rollbackなどが機能し、トランザクションに参加できているように
見えます。

Server側のNON_DURABLE_XAなCacheと、NON_DURABLE_XAにしたRemoteCacheを使った場合も、同じように。

  test("non-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key3", "value3")
      cache.get("key3") should be("value3")

      tm.commit()

      cache.get("key3") should be("value3")


      tm.begin()

      cache.put("key4", "value4")
      cache.get("key4") should be("value4")

      tm.rollback()

      cache.get("key4") should be(null)
    }
  }

Server側のCacheがNON_DURABLE_XAであっても、RemoteCache側はNON_XAといったように、Server側のCacheとRemoteCacheのTransactionModeは一致している
必要はないようです。

  test("non-durable-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_XA) { cache =>
      val tm = cache.getTransactionManager

      // 省略
    }
  }

RemoteCacheManagerのデフォルトの構成としてNON_XAとして、RemoteCacheを取得する際にNON_DURABLE_XAで上書きする例。

  test("non-durable-xa-transactional-cache / mode non-xa / override non-durable-xa") {
    withCacheManager(TransactionMode.NON_XA) { manager =>
      val cache = manager.getCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA)

      val tm = cache.getTransactionManager

      tm.begin()

      cache.put("key3", "value3")
      cache.get("key3") should be("value3")

      tm.commit()

      cache.get("key3") should be("value3")


      tm.begin()

      cache.put("key4", "value4")
      cache.get("key4") should be("value4")

      tm.rollback()

      cache.get("key4") should be(null)
    }
  }

RemoteCacheManager#getCache時に、TransactionModeを指定することで上書きすることができます。

      val cache = manager.getCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA)

TransactionManagerを上書きする場合は、同じくRemoteCacheManager#getCache時に上書きしたいTransactionManagerを指定します(TransactionManagerLookupではありません)。

RemoteCacheManager#getCache(cacheName, transactionMode, transactionManager)

なお、RemoteCacheManager(RemoteCache)をFULL_XAで構成しようとすると、ドキュメントの通り未サポートなので例外がスローされます。

  test("non-xa-transactional-cache / mode full-xa") {
    withCacheManager(TransactionMode.FULL_XA) { manager =>
      val thrown = the[IllegalArgumentException] thrownBy manager.getCache[String, String]("nonXaCache")
      thrown.getMessage should be("FULL_XA isn't supported yet!")
    }
  }

複数のCacheを同じトランザクションで扱う

JTAということで、複数のRemoteCacheを同じトランザクションで扱ってみます。

Server側はNON_XA、RemoteCacheもNON_XA。

  test("non-xa-transactional-cache / mode non-xa / multiple cache") {
    withCacheManager(TransactionMode.NON_XA) { manager =>
      val cache1 = manager.getCache[String, String]("nonXaCache1")
      val cache2 = manager.getCache[String, String]("nonXaCache2")

      val tm = cache1.getTransactionManager

      tm.begin()

      cache1.put("key1-1", "value1-1")
      cache2.put("key2-1", "value2-1")

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")

      tm.commit()

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")


      tm.begin()

      cache1.put("key1-2", "value1-2")
      cache2.put("key2-2", "value2-2")

      cache1.get("key1-2") should be("value1-2")
      cache2.get("key2-2") should be("value2-2")

      tm.rollback()

      cache1.get("key1-2") should be(null)
      cache2.get("key2-2") should be(null)
    }
  }

Server側はNON_DURABLE_XA、RemoteCacheもNON_DURABLE_XA。

  test("non-durable-xa-transactional-cache / mode non-durable-xa / multiple cache") {
    withCacheManager(TransactionMode.NON_DURABLE_XA) { manager =>
      val cache1 = manager.getCache[String, String]("nonDurableXaCache1")
      val cache2 = manager.getCache[String, String]("nonDurableXaCache2")

      val tm = cache1.getTransactionManager

      tm.begin()

      cache1.put("key1-1", "value1-1")
      cache2.put("key2-1", "value2-1")

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")

      tm.commit()

      cache1.get("key1-1") should be("value1-1")
      cache2.get("key2-1") should be("value2-1")


      tm.begin()

      cache1.put("key1-2", "value1-2")
      cache2.put("key2-2", "value2-2")

      cache1.get("key1-2") should be("value1-2")
      cache2.get("key2-2") should be("value2-2")

      tm.rollback()

      cache1.get("key1-2") should be(null)
      cache2.get("key2-2") should be(null)
    }
  }

どちらも実現方法は異なりますが、まあうまくいきます。

トランザクションの分離度?

ドキュメントには、RemoteCacheに関するIsolation Levelの記載は特にありません。Server側のCacheに、REPEATABLE_READを要求しているだけです。

というわけで、ちょっと動きを見てみましょう。2つのSingleThreadのExecutorを用意して、CompletableFututeでそれぞれのスレッドでトランザクションを開始し、
操作中の値が別のスレッド(トランザクション)から参照できたりするか、見てみます。

Server側、RemoteCacheも、ともにNON_XAの場合。

  test("transaction visibility / non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial
      cache.put("key30", "value30") // initial

      val tm = cache.getTransactionManager

      val updateExecutor = Executors.newSingleThreadExecutor
      val readExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), updateExecutor)
          .thenRunAsync(() => tm.begin(), readExecutor)

          .thenRunAsync(() => cache.put("key20", "value20"), updateExecutor) // insert
          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible

          .thenRunAsync(() => cache.put("key10", "value10-1"), updateExecutor) // update
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible

          .thenRunAsync(() => cache.remove("key30"), updateExecutor) // delete
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), readExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), updateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), updateExecutor)
          .thenRunAsync(() => cache.get("key30") should be(null), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key30") should be(null), readExecutor) // reader, deleted

      future.join()

      cache.get("key20") should be("value20")
      cache.get("key10") should be("value10-1")
      cache.get("key30") should be(null)
    }
  }

片方のスレッドでput、put(update)、removeをしていますが、もう片方のスレッドからはその結果が全然見えていませんね。

更新しているトランザクションのコミット後、さらに自身のトランザクションが完了すると参照できるようになっています。まあ、分離度高いですこと。

Server側とRemoteCacheが、NON_DURABLE_XAであっても結果は同じ。

  test("transaction visibility / non-durable-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial
      cache.put("key30", "value30") // initial

      val tm = cache.getTransactionManager

      val updateExecutor = Executors.newSingleThreadExecutor
      val readExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), updateExecutor)
          .thenRunAsync(() => tm.begin(), readExecutor)

          .thenRunAsync(() => cache.put("key20", "value20"), updateExecutor) // insert
          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible

          .thenRunAsync(() => cache.put("key10", "value10-1"), updateExecutor) // update
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible

          .thenRunAsync(() => cache.remove("key30"), updateExecutor) // delete
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be(null), readExecutor) // reader, non-visible
          .thenRunAsync(() => cache.get("key10") should be("value10"), readExecutor) // reader, still-old-value-visible
          .thenRunAsync(() => cache.get("key30") should be("value30"), readExecutor) // reader, still-visible

          .thenRunAsync(() => tm.commit(), readExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), updateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), updateExecutor)
          .thenRunAsync(() => cache.get("key30") should be(null), updateExecutor)

          .thenRunAsync(() => cache.get("key20") should be("value20"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key10") should be("value10-1"), readExecutor) // reader, visible
          .thenRunAsync(() => cache.get("key30") should be(null), readExecutor) // reader, deleted

      future.join()

      cache.get("key20") should be("value20")
      cache.get("key10") should be("value10-1")
      cache.get("key30") should be(null)
    }
  }

更新が競合したら?

最後に、更新が競合した場合の動きを見てみます。

これは、"後勝ち"になるようです?

Server側、RemoteCacheがともにNON_XA。

  test("conflict / non-xa-transactional-cache / mode non-xa") {
    withCache[String, String]("nonXaCache", TransactionMode.NON_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial

      val tm = cache.getTransactionManager

      val firstUpdateExecutor = Executors.newSingleThreadExecutor
      val secondUpdateExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), firstUpdateExecutor)
          .thenRunAsync(() => tm.begin(), secondUpdateExecutor)

          .thenRunAsync(() => cache.put("key10", "value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.put("key10", "value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => tm.commit(), firstUpdateExecutor)
          .thenRunAsync(() => tm.commit(), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), firstUpdateExecutor)  // last updated
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)


      future.join()

      cache.get("key10") should be("value10-2-1")  // last updated
    }
  }

Server側、RemoteCacheがともにNON_DURABLE_XA。

  test("conflict / non-durable-xa-transactional-cache / mode non-durable-xa") {
    withCache[String, String]("nonDurableXaCache", TransactionMode.NON_DURABLE_XA) { cache =>
      cache.clear()

      cache.put("key10", "value10") // initial

      val tm = cache.getTransactionManager

      val firstUpdateExecutor = Executors.newSingleThreadExecutor
      val secondUpdateExecutor = Executors.newSingleThreadExecutor

      val future =
        CompletableFuture
          .runAsync(() => tm.begin(), firstUpdateExecutor)
          .thenRunAsync(() => tm.begin(), secondUpdateExecutor)

          .thenRunAsync(() => cache.put("key10", "value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.put("key10", "value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-1-1"), firstUpdateExecutor)
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)

          .thenRunAsync(() => tm.commit(), firstUpdateExecutor)
          .thenRunAsync(() => tm.commit(), secondUpdateExecutor)

          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), firstUpdateExecutor)  // last updated
          .thenRunAsync(() => cache.get("key10") should be("value10-2-1"), secondUpdateExecutor)


      future.join()

      cache.get("key10") should be("value10-2-1")  // last updated
    }
  }

気になるところは、だいたい確認できた感じでしょうか?

もう少し、中身を

それでは、もう少し中身のほどを、ソースコードから追ってみます。

Server側のトランザクションの構成は、Hot Rod Client側からトランザクションを有効にしていた時に、確認していましたよね。非トランザクショナルな
Cacheを使おうとすると例外になったので。

このあたりのServer側のCache設定を確認しているソースコードは、こちら。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/TransactionRequestProcessor.java#L157-L178

Server側のCacheに課せられた制限は、今後このあたりが緩くなっていくのでしょうかね。将来的に、OPTIMISTICとか設定できるようにしたそうなドキュメントに
なっているので。

if (configuration.locking().isolationLevel() != IsolationLevel.REPEATABLE_READ) {
throw log.unexpectedIsolationLevel(cache.getName());
}

//TODO because of ISPN-7672, optimistic and total order transactions needs versions. however, versioning is currently broken
if (configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC ||
configuration.transaction().transactionProtocol() == TransactionProtocol.TOTAL_ORDER) {
//no Log. see TODO.
throw new IllegalStateException(
String.format("Cache '%s' cannot use Optimistic neither Total Order transactions.", cache.getName()));
}

https://github.com/infinispan/infinispan/blob/9.3.0.Final/server/hotrod/src/main/java/org/infinispan/server/hotrod/TransactionRequestProcessor.java#L157-L178

そうそう、Server側のCacheを作る時にTransactionModeとLocking以外は指定していないのですが、

[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=nonXaCacheConfiguration/transaction=TRANSACTION:add(mode=NON_XA,locking=PESSIMISTIC)
{"outcome" => "success"}

以前のInfinispanのデフォルトのトランザクション分離レベルはREAD_COMMITTEDだったのが、Infinispan 9.0からREPEATABLE_READになっていたようですね。

[ISPN-7613] Enable write-skew for optimistic + repeatable-read transactions - JBoss Issue Tracker

これには気付いていませんでした、覚えておきましょう。

RemoteCacheManagerでトランザクションを有効にした場合、もしくはRemoteCache取得時にトランザクションの設定を上書きしてトランザクショナルなRemoteCacheに
した場合、RemoteCacheManager#getCacheで返却されるRemoteCacheの実装が変わります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L330-L335

トランザクションが無効な場合はRemoteCacheImplまたはInvalidatedNearRemoteCacheですが、トランザクションが有効な場合はTransactionalRemoteCacheImpl
となります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/TransactionalRemoteCacheImpl.java

RemoteCacheに指定するTransactionModeはNON_XA、NON_DURABLEとありますが、これがどこに影響するかというと、RemoteCacheManagerが持つTransactionTableに
なります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L95-L96

インターフェースとしてTransactionTableがあり、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/TransactionTable.java

その実装としてSyncModeTransactionTable、XaModeTransactionTableの2つがあります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/SyncModeTransactionTable.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java

どちらが使われるかは、以下のコードで決まります。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java#L447-L457

NON_XAの場合はSyncModeTransactionTable、NON_DURABLE_XAの場合はXaModeTransactionTableとなります。

両者の差は、最初に書いた通りNON_XA(SyncModeTransactionTable)の場合はJTAのSynchronizationを使った仕組みでトランザクションに参加し、
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/SyncModeTransactionTable.java#L71

NON_DURABLE_XA(XaModeTransactionTable)の場合はリカバリに制限があるXAリソースとしてトランザクションに参加します。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java#L56

リカバリ用の処理は、入ってませんよ、と。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transaction/XaModeTransactionTable.java#L181

今回は、こんなところで。

まとめ

Infinispan 9.3で追加された、Hot Rod Transactionを試してみました。

Server側とHot Rod Client側でトランザクションの設定がそれぞれ必須だったりと、ちょっと最初は慣れないところはありましたが、だいたい雰囲気は
分かったのではないかなと。

あと、今回はドキュメントがあったので助かりました。

テストコードも参考にしつつ。
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/TxFunctionalTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/MultipleCacheTxFunctionalTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/LCROTest.java
https://github.com/infinispan/infinispan/blob/9.3.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/tx/util/TransactionSetup.java

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-transaction

2018-06-22

Keycloak 4のSpring Boot 2 Adapterを試す

Keycloak 4.0.0.Finalがリリースされました。Release Notesを見ていると、その中にSpring Boot 2へのサポートが追加されたと
書かれていたので、ちょっと試してみようかと。

Keycloak 4.0.0.Final

Sprint Boot 2

ところが、ドキュメントにはSpring Boot 2についての記載がありません。Spring Boot 1向けのAdapterの記述は
あるのですが…。

Spring Boot Adapter

なので、ここはソースコードから存在を確認することに。

https://github.com/keycloak/keycloak/blob/4.0.0.Final/boms/adapter/pom.xml#L139

「keycloak-spring-boot-2-starter」を使えばよいみたいです。

最初の誤解

予備知識なしでやろうとして、KeycloakのSpring Boot 1/2 AdapterはてっきりSpring Securityと関連するものかと思って
いたのですが、どうやらそうではないようです。

KeycloakのSpring Boot Adapterと、Spring Security Adapterは別物です。

Spring Boot Adapter

Spring Security Adapter

最初、てっきりSpring Boot Adapterの方がSpring Security Adapterの延長線上にいるものかと思っていたら、全然違うことに
途中で気付きました…。

参考)
Easily secure your Spring Boot applications with Keycloak - RHD Blog

今回は、Spring Boot 2 Adapterを使うことにします。

お題

今回は、以前に書いたこちらのエントリと同様、REST APIに対してログイン必須のものとそうでないものを作成して、
認証必須の場合の挙動の確認や、Keycloakから取得できる情報の確認、ログアウトなどをやってみたいと思います。

KeycloakのJava Servlet Filter Adapterを使ってOpenID Connect - CLOVER

利用するのは、前回同様OpenID Connectです。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)


$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"

Keycloakは、4.0.0.Finalです。

準備

今回の、Maven依存関係はこちら。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.3.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
              <groupId>org.keycloak.bom</groupId>
              <artifactId>keycloak-adapter-bom</artifactId>
              <version>4.0.0.Final</version>
              <type>pom</type>
              <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.keycloak</groupId>
            <artifactId>keycloak-spring-boot-2-starter</artifactId>
        </dependency>
    </dependencies>

keycloak-adapter-bomのimportと、keycloak-spring-boot-2-starterの依存関係の追加だけではダメで、Spring Boot自体も
dependencyManagementに入れておかないと、実行時にライブラリのバージョン不整合が起こってエラーになります…。

Keycloakは起動済みとして、Keycloakおよび母体のWildFlyの管理アカウントを作成します。

$ bin/add-user-keycloak.sh -u keycloak-admin -p password
$ bin/add-user.sh -u admin -p password
$ bin/jboss-cli.sh -c -u=admin -p=password --command=reload

REST APIで使うユーザーは、あとで作成しましょう。

サンプルコードの作成

では、サンプルコードを作っていきます。

まずは、ログインしなくてもアクセスできる@RestController。
src/main/java/org/littlewings/keycloak/spring/controller/PublicController.java

package org.littlewings.keycloak.spring.controller;

import java.security.Principal;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;

import org.keycloak.KeycloakSecurityContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("public")
public class PublicController {
    @GetMapping("hello")
    public String hello() {
        return "Hello Application!!";
    }

    @GetMapping("keycloak-context")
    public Map<String, Object> keycloakContext(HttpServletRequest request) {
        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = request.getUserPrincipal();
        if (principal != null) {
            results.put("principal-type", principal.getClass().getName());
            results.put("principal-name", principal.getName());
        }

        KeycloakSecurityContext context =
                (KeycloakSecurityContext) request.getAttribute(KeycloakSecurityContext.class.getName());
        if (context != null) {
            results.put("id-token-from-request", context.getIdToken());
            results.put("roles-from-request", context.getToken().getRealmAccess().getRoles());
        }

        KeycloakSecurityContext contextFromSession =
                (KeycloakSecurityContext) request.getSession().getAttribute(KeycloakSecurityContext.class.getName());
        if (contextFromSession != null) {
            results.put("id-token-from-session", contextFromSession.getIdToken());
            results.put("roles-from-session", context.getToken().getRealmAccess().getRoles());
        }

        return results;
    }
}

2つ目のメソッドでは、Keycloakの情報にHttpServletRequestまたはHttpSessionからアクセスしようとしています。

    @GetMapping("keycloak-context")
    public Map<String, Object> keycloakContext(HttpServletRequest request) {
        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = request.getUserPrincipal();
        ...

        KeycloakSecurityContext context =
                (KeycloakSecurityContext) request.getAttribute(KeycloakSecurityContext.class.getName());
        ...

        KeycloakSecurityContext contextFromSession =
                (KeycloakSecurityContext) request.getSession().getAttribute(KeycloakSecurityContext.class.getName());
        ...

続いて、アクセスするのにログインが必要な@RestController。
src/main/java/org/littlewings/keycloak/spring/controller/SecureController.java

package org.littlewings.keycloak.spring.controller;

import java.net.URI;
import java.security.Principal;
import java.util.LinkedHashMap;
import java.util.Map;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;

import org.keycloak.KeycloakSecurityContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("secure")
public class SecureController {
    @GetMapping("hello")
    public String hello() {
        return "Hello Secure Application!!";
    }

    @GetMapping("keycloak-context")
    public Map<String, Object> keycloakContext(HttpServletRequest request) {
        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = request.getUserPrincipal();
        if (principal != null) {
            results.put("principal-type", principal.getClass().getName());
            results.put("principal-name", principal.getName());
        }

        KeycloakSecurityContext context =
                (KeycloakSecurityContext) request.getAttribute(KeycloakSecurityContext.class.getName());
        if (context != null) {
            results.put("id-token-from-request", context.getIdToken());
            results.put("roles-from-request", context.getToken().getRealmAccess().getRoles());
        }

        KeycloakSecurityContext contextFromSession =
                (KeycloakSecurityContext) request.getSession().getAttribute(KeycloakSecurityContext.class.getName());
        if (contextFromSession != null) {
            results.put("id-token-from-session", contextFromSession.getIdToken());
            results.put("roles-from-session", context.getToken().getRealmAccess().getRoles());
        }

        return results;
    }

    @GetMapping("logout")
    public ResponseEntity<String> logout(HttpServletRequest request) throws ServletException {
        request.logout();

        return ResponseEntity.status(HttpStatus.SEE_OTHER).location(URI.create("/public/hello")).build();
    }
}


    @GetMapping("keycloak-context")
    public Map<String, Object> keycloakContext(HttpServletRequest request) {
        Map<String, Object> results = new LinkedHashMap<>();

        Principal principal = request.getUserPrincipal();
        ...

        KeycloakSecurityContext context =
                (KeycloakSecurityContext) request.getAttribute(KeycloakSecurityContext.class.getName());
        ...

        KeycloakSecurityContext contextFromSession =
                (KeycloakSecurityContext) request.getSession().getAttribute(KeycloakSecurityContext.class.getName());
        ...

        return results;
    }

    @GetMapping("logout")
    public ResponseEntity<String> logout(HttpServletRequest request) throws ServletException {
        request.logout();

        return ResponseEntity.status(HttpStatus.SEE_OTHER).location(URI.create("/public/hello")).build();
    }

ところで、今回使っているKeycloakのAPI、特にSpring Boot Adapterには説明がありません。

Spring Boot Adapter

で、ドキュメントのどのAPIを見ればよいかというと、Java Servlet Filter Adapterなどと同じく、Servlet APIで記載されたドキュメントを見ます。

Security Context

Error Handling

Logout

Spring Boot Adapterのドキュメントとしても、サポートしているEmbedded Containerは、

ということなので、やっぱりServletベースなのですね。

また、Spring Bootを使ったアプリケーションの起動クラスを作成します。
src/main/java/org/littlewings/keycloak/spring/App.java

package org.littlewings.keycloak.spring;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }
}

設定

Spring Boot 1/2 Adapterを使うと、keycloak.jsonではなく、application.propertiesにKeycloakの設定を書くことができます。

今回作成したapplication.propertiesは、こちら。
src/main/resources/application.properties

keycloak.realm = demo-api
keycloak.auth-server-url = http://172.17.0.2:8080/auth
keycloak.resource = sample-rest-api
keycloak.credentials.secret = 72235f7b-06b3-4efd-96be-d8f4f8fada74

keycloak.security-constraints[0].auth-roles[0] = users
keycloak.security-constraints[0].security-collections[0].name = secure area
keycloak.security-constraints[0].security-collections[0].patterns[0] = /secure/*

spring.jackson.serialization.indent_output=true

realmは「demo-api」、KeycloakへのアクセスURLは「http://172.17.0.2:8080/auth」、Keycloak側へ登録するClientは「sample-rest-api」、
Clientを作成した時のCredintialを設定します。
※Credentialは、KeycloakでClientを作成してから判明します

アクセス設定は、「/secure/*」配下のURLに、「users」ロールを持ったアカウントのみアクセスできるようにしています。

Keycloak側での設定

OpenID Connectでアクセスする、Keycloak側の設定をします。まあ、順番的には先にKeycloakの設定をしてから、クライアントアプリケーションの設定の順番
なのですが、今回はこういう感じで。

Realmの作成。名前は、「demo-api」。
f:id:Kazuhira:20180622234412p:image

Clientの作成。名前は、「sample-rest-api」。
f:id:Kazuhira:20180623000424p:image

「Access Type」を「confidential」に変更して「Save」すると、Credentialsを確認することができるようになります。
f:id:Kazuhira:20180623000705p:image

これを、application.propertiesに設定します。

keycloak.credentials.secret = 72235f7b-06b3-4efd-96be-d8f4f8fada74

続いて、ロールを作成。「users」と、「others」の2つのロールを作成します。
※「others」は省略
f:id:Kazuhira:20180623001030p:image

最後に、ユーザー作成。ロール「users」に紐付ける「api-user」と、「others」に紐付ける「other-user」を作成します。
※パスワードの設定は、省略しています
※「other-user」は省略
f:id:Kazuhira:20180623004054p:image

ロールの紐付け。
f:id:Kazuhira:20180623004050p:image

ユーザーのロールの割り当てとKeycloakのクライアント側の設定によって、ログイン後にどの範囲にアクセスできるかどうかが決まります。

確認してみる

それでは、確認してみましょう。

パッケージング。

$ mvn package

起動。

$ java -jar target/keycloak-spring-boot-adapter-0.0.1-SNAPSHOT.jar

まずは、ログインが必要ない「/public/hello」へ。そのまま表示できます。
f:id:Kazuhira:20180623004749p:image

Keycloakの情報を表示する「/public/keycloak-context」ページは、ログインしないままだと中身が空っぽになります。
f:id:Kazuhira:20180623004936p:image

次に、ログインを要求する「/secure/hello」にアクセスしてみます。すると、Keycloakへのログインを求められるので、「api-user」でログインします。
f:id:Kazuhira:20180623005405p:image

今度は、アクセスできます。
f:id:Kazuhira:20180623005615p:image

「/secure/keycloak-context」にアクセスすると、Keycloakから取得した情報を参照することができます。
f:id:Kazuhira:20180623005845p:image

この状態で、「/public/keycloak-context」へアクセスすると、今度はログインが必須のページではなくてもKeycloakの情報を表示することができます。
f:id:Kazuhira:20180623010136p:image

「/secure/logout」でログアウト。

ログアウトすると、ログインが必要ない「/public/hello」に戻り、ログイン状態も解除されます(「/secure配下にアクセスすると、Keycloakからログインを
求められる)。

ログアウト後、今度は「others」ロールを持つ「other-user」でアクセスしてみます。すると、403エラーになりました。
f:id:Kazuhira:20180623010651p:image

ここの設定どおり、「users」ロールに属するユーザーでなければ、アクセスできません、と。

keycloak.security-constraints[0].auth-roles[0] = users
keycloak.security-constraints[0].security-collections[0].name = secure area
keycloak.security-constraints[0].security-collections[0].patterns[0] = /secure/*

もう少し中身を

KeycloakのSpring Boot Adapterは、サポートしているコンテナ、Tomcat、Undertow、JettyにKeycloakの機能を組み込んでいくものみたいです。
https://github.com/keycloak/keycloak/blob/4.0.0.Final/adapters/oidc/spring-boot-adapter-core/src/main/java/org/keycloak/adapters/springboot/KeycloakBaseSpringBootConfiguration.java

Servlet Filterなどではありません。

また、設定可能な項目はこちら。
https://github.com/keycloak/keycloak/blob/4.0.0.Final/adapters/oidc/spring-boot-adapter-core/src/main/java/org/keycloak/adapters/springboot/KeycloakSpringBootProperties.java

通常のWebアプリケーションであれば、keycloak.jsonとweb.xmlで設定するところをapplication.propertiesで設定できるということでしょう。

この部分は、まさしくweb.xmlのsecurity-constraintに該当しますね。

keycloak.security-constraints[0].auth-roles[0] = users
keycloak.security-constraints[0].security-collections[0].name = secure area
keycloak.security-constraints[0].security-collections[0].patterns[0] = /secure/*

まとめ

Keycloak 4.0.0.Finalで追加された、Spring Boot 2 Adapterを試してみました。

そもそもKeycloakでSpring Bootを使うこと自体が初めてでしたし、Spring Securityとは関係ない世界にいるんだなーということを最初認識していなくて
戸惑ったりしましたが、そのあたりも把握できてよかったです。

2018-06-19

Spring Cloud StreamのReactive Programming Supportで遊ぶ

Spring Cloud Streamには、Reactor…Reactive Programmingに対するサポートがあります。

Programming Model / Reactive Programming Support

今まで、Reactiveでない方のAPI(詳細見てませんけど、対比してなんて言うんだ?)を使ってばかりでしたが、今回はReactorと合わせて
Spring Cloud Streamを使ってみたいと思います。

この機能を使うと、ReactorのFluxなどのAPIを使って、Spring Cloud Streamの機能を使ったプログラムを書くことができます。

お題

今回のお題は、こういうものでいきましょう。

  • Source、Processor、Sinkの構成とする
  • 各要素をつなぐメッセージングミドルウェアは、Apache Kafkaとする
  • [Source] TwitterのストリームAPIを使い、「#nowplaying」タグを含むツイートをSourceとしてApache Kafkaに放り込む
  • [Processor] CodelibsのApache Lucene Kuromoji Analyzerを使って、ツイートを単語に分解する
    • この時、単語にツイートの日時も付加するようにする
  • [Sink] Processorにより分解された単語を、Window処理でWord Countする
    • Window処理はウィンドウサイズを1分、スライディングする時間を10秒として設定
    • あまり大量に出力されるのも見づらいので、あるウィンドウ内で出現数が5以上の単語のみを出力
    • 最後に、該当のウィンドウに含まれた単語がツイートされた時間をまとめて出力し、ウィンドウやスライディングの処理が想定通りになっているか確認する

要するに、ツイートを1分間ごとにWord Countしようというお題ですね。

これを、ReactorのAPIを使って実現します。

環境

Javaに関する情報は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)


$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-23-generic", arch: "amd64", family: "unix"

Apache Kafkaは1.1.0を使用し、1 Nodeとします。また、事前に起動済みとします。

Spring Cloud Streamのバージョンは、Elmhurst.RELEASEです。

Maven依存関係

ここでは、親pom.xmlの内容をまずは紹介。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Elmhurst.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.1.RELEASE</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

このサブモジュールとして、Source、Processor、Sinkを作っていきます。

Source

では、まずはSourceを作成します。Maven依存関係は、こちら。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.6</version>
        </dependency>

ミニマムでいっているので、Spring WebFluxすら使用していません。

TwitterAPIの利用には、Twitter4Jを使用します。事前に、アカウントと「twitter4j.properties」を用意しておいてください。

データはSpring Cloud Stream経由でApache Kafkaに登録するため「spring-cloud-starter-stream-kafka」が必要ですが、Reactorを使うには
さらに「spring-cloud-stream-reactive」が必要です。

Sourceのソースコードは、こちら。
twitter-source/src/main/java/org/littlewings/spring/cloud/Twiturce.java

package org.littlewings.spring.cloud;

import java.util.LinkedHashMap;
import java.util.Map;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.reactive.StreamEmitter;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

@SpringBootApplication
@EnableBinding(Source.class)
public class TwitteStreamSource {
    Logger logger = LoggerFactory.getLogger(TwitteStreamSource.class);

    TwitterStream twitterStream = new TwitterStreamFactory().getInstance();

    public static void main(String... args) {
        SpringApplication.run(TwitteStreamSource.class, args);
    }

    @StreamEmitter
    @Output(Source.OUTPUT)
    public Flux<Map<String, Object>> streaming() {
        return Flux.create(sink -> {
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    logger.info("tweet = {}", tweet);
                    sink.next(tweet);
                }

                @Override
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                }

                @Override
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                }

                @Override
                public void onScrubGeo(long userId, long upToStatusId) {
                }

                @Override
                public void onStallWarning(StallWarning warning) {
                }

                @Override
                public void onException(Exception e) {
                    e.printStackTrace();
                }
            });

            twitterStream.filter("#nowplaying");
        });
    }

    @PreDestroy
    public void stop() {
        twitterStream.clearListeners();
        twitterStream.cleanUp();
    }
}

ここでのポイントは、@StreamEmitterアノテーションを付与したメソッドとし、@Output(Source.OUTPUT)を指定していることです。この宣言の状態で、
Fluxなどを返すとOKです。

    @StreamEmitter
    @Output(Source.OUTPUT)
    public Flux<Map<String, Object>> streaming() {
        return Flux.create(sink -> {

なお、@OutputについてはFluxSenderを使用してデータを送っても大丈夫です。その場合は@StreamListenerとなっており、サンプルがドキュメントに
記載されています。
Programming Model / Reactive Programming Support

Fluxは、Flux#createを使って、Twitter4JのStatusListenerが受け取った値をMapに詰め替えてSinkに渡す実装としています。

        return Flux.create(sink -> {
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    logger.info("tweet = {}", tweet);
                    sink.next(tweet);
                }

一応トレースもしたいので、ツイート内容はログ出力するようにしています。

アプリケーションの設定は、こちら。
twitter-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=tweet-source-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Apache Kafkaが動作しているサーバーは、「172.17.0.2」とします。

Twitter4Jの利用にあたっては、「twitter4j.properties」に設定を行っています。
twitter-source/src/main/resources/twitter4j.properties

Processor

続いて、ProcessorMaven依存関係は、こちらです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.codelibs</groupId>
            <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId>
            <version>7.3.1-20180604</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>codelibs.org</id>
            <name>CodeLibs Repository</name>
            <url>http://maven.codelibs.org/</url>
        </repository>
    </repositories>

Apache Kuromojiは、Codelibsによりmecab-ipadic-NEologdを組み込んでビルドされたものを使用します。

Processorソースコードは、こちら。
tokenize-processor/src/main/java/org/littlewings/spring/cloud/TweetTokenizeProcessor.java

package org.littlewings.spring.cloud;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import reactor.core.publisher.Flux;

@SpringBootApplication
@EnableBinding(Processor.class)
public class TweetTokenizeProcessor {
    Logger logger = LoggerFactory.getLogger(TweetTokenizeProcessor.class);

    JapaneseAnalyzer analyzer = new JapaneseAnalyzer();

    public static void main(String... args) {
        SpringApplication.run(TweetTokenizeProcessor.class, args);
    }

    @StreamListener
    @Output(Processor.OUTPUT)
    public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) {
        return tweets.flatMap(tweet -> {
            String tweetText = (String) tweet.get("text");

            TokenStream tokenStream = analyzer.tokenStream("", tweetText);
            CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);

            try {
                List<String[]> tokens = new ArrayList<>();

                tokenStream.reset();

                while (tokenStream.incrementToken()) {
                    String token = charTermAttribute.toString();

                    if (token.length() >= 5) {
                        String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt"));
                        logger.info("token = {}, time = {}", token, dateAsString);

                        String[] tokenAndDate = {token, dateAsString};
                        tokens.add(tokenAndDate);
                    }
                }

                tokenStream.end();

                return Flux.fromIterable(tokens);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            } finally {
                try {
                    tokenStream.close();
                } catch (IOException e) {
                    // ignore
                }
            }
        });
    }
}

今回は@StreamListenerとして定義し、入力を@InputかつFluxとして、結果を@OutputでFluxとして返すように作成しています。

    @StreamListener
    @Output(Processor.OUTPUT)
    public Flux<String[]> receive(@Input(Processor.INPUT) Flux<Map<String, Object>> tweets) {

形態素解析後は、5文字以上の単語を、ツイートの日時と一緒にApache Kafkaに送ります。

                while (tokenStream.incrementToken()) {
                    String token = charTermAttribute.toString();

                    if (token.length() >= 5) {
                        String dateAsString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(tweet.get("createdAt"));
                        logger.info("token = {}, time = {}", token, dateAsString);

                        String[] tokenAndDate = {token, dateAsString};
                        tokens.add(tokenAndDate);
                    }
                }

アプリケーションの設定は、こちら。
tokenize-processor/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=tweet-source-topic
spring.cloud.stream.bindings.input.group=tweet-source-group

spring.cloud.stream.bindings.output.destination=tweet-word-topic
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Sink

最後は、Sinkです。

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-reactive</artifactId>
        </dependency>

ソースコードは、こちら。
token-sink/src/main/java/org/littlewings/spring/cloud/WordCountSink.java

package org.littlewings.spring.cloud;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@SpringBootApplication
@EnableBinding(Sink.class)
public class WordCountSink {
    Logger logger = LoggerFactory.getLogger(WordCountSink.class);

    public static void main(String... args) {
        SpringApplication.run(WordCountSink.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void wordCount(Flux<String[]> words) {
        words
                .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L))
                .subscribe(ws -> {
                    Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> {
                        acc.addDate(word[1]);
                        acc.increment(word[0]);
                        return acc;
                    });

                    records.subscribe(m -> {
                        m.getWordCounts().entrySet().forEach(entry -> {
                            if (entry.getValue() >= 5) {
                                logger.info(
                                        "word = {}, count = {}",
                                        entry.getKey(),
                                        entry.getValue()
                                );
                            }
                        });

                        logger.info("contain times = {}", m.times);
                    });
                });
    }

    static class TimeRecords {
        Set<String> times = new TreeSet<>();

        Map<String, Integer> wordCounts = new TreeMap<>();

        public void addDate(String date) {
            times.add(date);
        }

        public void increment(String word) {
            wordCounts.merge(word, 1, Integer::sum);
        }

        public Map<String, Integer> getWordCounts() {
            return wordCounts;
        }
    }
}

@StreamListenerを付けて、Processorで単語分割した結果と、ツイートの日時(文字列ですが)をFluxとして受け取ります。

    @StreamListener(Sink.INPUT)
    public void wordCount(Flux<String[]> words) {

そして、これをWord Count。

ウィンドウサイズを1分にして、10秒ごとにスライドさせるようにしました。

        words
                .window(Duration.ofMinutes(1L), Duration.ofSeconds(10L))

Word Countは、ちょっと箱物クラスを作って、その中で行います。

    static class TimeRecords {
        Set<String> times = new TreeSet<>();

        Map<String, Integer> wordCounts = new TreeMap<>();

        public void addDate(String date) {
            times.add(date);
        }

        public void increment(String word) {
            wordCounts.merge(word, 1, Integer::sum);
        }

        public Map<String, Integer> getWordCounts() {
            return wordCounts;
        }
    }

単語と出現回数自体はMapで持つのですが、その時に単語分割した時に得られた日時も持っておきます。

これで、ウィンドウ内に含まれる時間を見ようかなと。

集計部分。

                .subscribe(ws -> {
                    Mono<TimeRecords> records = ws.reduce(new TimeRecords(), (acc, word) -> {
                        acc.addDate(word[1]);
                        acc.increment(word[0]);
                        return acc;
                    });

                    records.subscribe(m -> {
                        m.getWordCounts().entrySet().forEach(entry -> {
                            if (entry.getValue() >= 5) {
                                logger.info(
                                        "word = {}, count = {}",
                                        entry.getKey(),
                                        entry.getValue()
                                );
                            }
                        });

                        logger.info("contain times = {}", m.times);
                    });
                });

結果は標準出力に出すのですが、あんまり出力されすぎるのもなんなので、登場回数が5回以上のものにしました。

ただ、処理対象となった日時は全部含めるものとしています。

アプリケーションの設定は、こちら。
token-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=tweet-word-topic
spring.cloud.stream.bindings.input.group=tweet-word-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

実行

では、作成したアプリケーションを実行してみます。

パッケージング。

$ mvn package

実行。

## Source
$ java -jar twitter-source/target/twitter-source-0.0.1-SNAPSHOT.jar


## Processor
$ java -jar tokenize-processor/target/tokenize-processor-0.0.1-SNAPSHOT.jar


## Sink
$ java -jar token-sink/target/token-sink-0.0.1-SNAPSHOT.jar

実行ログ。

Source…のログは、Twitterのidとかいろいろ出てくるので、省略。

Processor。httpsとか入っているのは、ご愛嬌…。

2018-06-19 21:02:57.645  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = リニューアル, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.646  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.646  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.807  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = 08234, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.808  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.858  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = rokomama, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.859  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.860  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = bdbyh, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.869  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = hjinny, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = ジェジュン, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = nowplaying, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = https, time = 2018-06-19 21:02:57
2018-06-19 21:02:57.870  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = sarwrsgk, time = 2018-06-19 21:02:57
2018-06-19 21:02:58.158  INFO 15234 --- [container-0-C-1] o.l.spring.cloud.TweetTokenizeProcessor  : token = smallzy, time = 2018-06-19 21:02:57

Sink側は、最初は1分、それ以降は10秒ごとに、スライドしながら結果が出力されます。

2018-06-19 21:03:34.308  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = =love, count = 5
2018-06-19 21:03:34.308  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = bornfreeonekiss, count = 14
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = favourite, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = https, count = 232
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = japan, count = 7
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = jejung, count = 30
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = let's, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = listen, count = 11
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = listenlive, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = love music, count = 5
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = mrskodako, count = 6
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = music, count = 14
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = nowplaying, count = 273
2018-06-19 21:03:34.309  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = radio, count = 9
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = single, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = smallzy, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = smallzyssurgery, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = sukatto, count = 7
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = the the, count = 11
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = youngblood, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = ありがとう, count = 10
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = ジェジュン, count = 166
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = リクエスト, count = 15
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : word = 痛快tvスカッとジャパン, count = 5
2018-06-19 21:03:34.310  INFO 15255 --- [     parallel-2] o.l.spring.cloud.WordCountSink           : contain times = [2018-06-19 21:02:34, 2018-06-19 21:02:35, 2018-06-19 21:02:36, 2018-06-19 21:02:37, 2018-06-19 21:02:38, 2018-06-19 21:02:39, 2018-06-19 21:02:40, 2018-06-19 21:02:41, 2018-06-19 21:02:42, 2018-06-19 21:02:43, 2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34]


2018-06-19 21:03:44.309  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = =love, count = 5
2018-06-19 21:03:44.309  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = bornfreeonekiss, count = 15
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = favourite, count = 8
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = https, count = 231
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = instagram, count = 5
2018-06-19 21:03:44.310  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = japan, count = 7
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = jejung, count = 30
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = let's, count = 8
2018-06-19 21:03:44.311  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = listen, count = 12
2018-06-19 21:03:44.312  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = listenlive, count = 6
2018-06-19 21:03:44.312  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = love music, count = 5
2018-06-19 21:03:44.313  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = mrskodako, count = 5
2018-06-19 21:03:44.313  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = music, count = 13
2018-06-19 21:03:44.314  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = nowplaying, count = 278
2018-06-19 21:03:44.314  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = radio, count = 13
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = retweet, count = 7
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = single, count = 5
2018-06-19 21:03:44.315  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = smallzy, count = 8
2018-06-19 21:03:44.316  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = smallzyssurgery, count = 8
2018-06-19 21:03:44.316  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = sukatto, count = 7
2018-06-19 21:03:44.317  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = the now, count = 6
2018-06-19 21:03:44.317  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = the the, count = 7
2018-06-19 21:03:44.318  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = tweet, count = 7
2018-06-19 21:03:44.318  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = youngblood, count = 8
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = ありがとう, count = 9
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = ジェジュン, count = 162
2018-06-19 21:03:44.319  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : word = リクエスト, count = 14
2018-06-19 21:03:44.320  INFO 15255 --- [     parallel-3] o.l.spring.cloud.WordCountSink           : contain times = [2018-06-19 21:02:44, 2018-06-19 21:02:45, 2018-06-19 21:02:46, 2018-06-19 21:02:47, 2018-06-19 21:02:48, 2018-06-19 21:02:49, 2018-06-19 21:02:50, 2018-06-19 21:02:51, 2018-06-19 21:02:52, 2018-06-19 21:02:53, 2018-06-19 21:02:54, 2018-06-19 21:02:55, 2018-06-19 21:02:56, 2018-06-19 21:02:57, 2018-06-19 21:02:58, 2018-06-19 21:02:59, 2018-06-19 21:03:00, 2018-06-19 21:03:01, 2018-06-19 21:03:02, 2018-06-19 21:03:03, 2018-06-19 21:03:04, 2018-06-19 21:03:05, 2018-06-19 21:03:06, 2018-06-19 21:03:07, 2018-06-19 21:03:08, 2018-06-19 21:03:09, 2018-06-19 21:03:10, 2018-06-19 21:03:11, 2018-06-19 21:03:12, 2018-06-19 21:03:13, 2018-06-19 21:03:14, 2018-06-19 21:03:15, 2018-06-19 21:03:16, 2018-06-19 21:03:17, 2018-06-19 21:03:18, 2018-06-19 21:03:19, 2018-06-19 21:03:20, 2018-06-19 21:03:21, 2018-06-19 21:03:22, 2018-06-19 21:03:23, 2018-06-19 21:03:24, 2018-06-19 21:03:25, 2018-06-19 21:03:26, 2018-06-19 21:03:27, 2018-06-19 21:03:28, 2018-06-19 21:03:29, 2018-06-19 21:03:30, 2018-06-19 21:03:31, 2018-06-19 21:03:32, 2018-06-19 21:03:33, 2018-06-19 21:03:34, 2018-06-19 21:03:35, 2018-06-19 21:03:36, 2018-06-19 21:03:37, 2018-06-19 21:03:38, 2018-06-19 21:03:39, 2018-06-19 21:03:40, 2018-06-19 21:03:41, 2018-06-19 21:03:42, 2018-06-19 21:03:43]

ログを見ると、どの範囲の時間が含まれているか、わかる感じですね。

まとめ

今回は、Spring Cloud StreamのReactive Programming Supportを試すということで、Twitterからストリームを読み込み、Apache Kafkaに放り込み、単語分割し、
最後にWord CountするところまでをReactorを使ってやってみました。

最近Reactorを使っていなかったのでちょっとてこずりましたが、使っていないAPIなども試しつつ、理解が進んだ感じでよかったかなと。