Hatena::ブログ(Diary)

CLOVER

2016-07-27

Spring Data GeodeをとりあえずSpring Boot Starterなしで使う

Apache Geode 1.0.0-incubating.M2がリリースされた時に、Spring Data Geodeの1.0.0.APACHE-GEODE-INCUBATING-M2がリリースされました。

Spring Data Geode 1.0.0.APACHE-GEODE-INCUBATING-M2 Released

Spring Data Gemfireのバージョンのうち、特にApache Geodeのサポートに寄ったものをSpring Data Geodeと呼ぶみたいですね。
※ところで、Spring Data Geodeの個々のバージョンがSpring Data Gemfireのどれと近いのかってわかるんでしょうか…

で、Spring Data Gemfireがリリースされていたことは知っていましたが、試してはいませんでした。

Spring Boot Starterはまだないし、なんか忘れましたがムリヤリ使おうとしてハマったような気が…。

まあ、Geode用のStarterはGeodeのGA待ちみたいです。

Add starter and sample for Apache Geode. by jxblum ? Pull Request #5445 ? spring-projects/spring-boot ? GitHub

ところでですね、Spring BootのSpring Data Gemfire用のStarterとAutoConfigureってほとんど中身がないのを見て、これってSpring Data Geodeだけで動かせるんじゃ?と思い、試してみました。

https://github.com/spring-projects/spring-boot/tree/v1.3.6.RELEASE/spring-boot-starters/spring-boot-starter-data-gemfire
https://github.com/spring-projects/spring-boot/tree/v1.3.6.RELEASE/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data
※gemfireパッケージがない

とりあえず、最小構成で動かしてみましょう。

準備

pomの定義から。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>p2p-spring-data-without-boot-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.6.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spring Data Geode -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-geode</artifactId>
            <version>1.0.0.APACHE-GEODE-INCUBATING-M2</version>
        </dependency>

        <!-- Spring Boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- AssertJ -->
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Spring Data Geode 1.0.0.APACHE-GEODE-INCUBATING-M2、トランザクションも軽く使うのでSpring Tx、あとはSpring Boot関連とAssertJを足しています。

Apache Geode用のJavaConfigを書く

Apache GeodeのCache/Regionを定義するための、JavaConfigを書きます。トランザクション管理も有効にしておきます。
src/main/java/org/littlewings/geode/springdata/GeodeConfig.java

package org.littlewings.geode.springdata;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.RegionAttributes;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.data.gemfire.GemfireTransactionManager;
import org.springframework.data.gemfire.PartitionAttributesFactoryBean;
import org.springframework.data.gemfire.PartitionedRegionFactoryBean;
import org.springframework.data.gemfire.RegionAttributesFactoryBean;
import org.springframework.data.gemfire.repository.config.EnableGemfireRepositories;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@Configuration
@EnableGemfireRepositories
@EnableTransactionManagement
public class GeodeConfig {
    @Bean
    public CacheFactoryBean gemfireCache() {
        CacheFactoryBean cacheFactory = new CacheFactoryBean();
        cacheFactory.setClose(true);
        return cacheFactory;
    }

    @Bean(name = "myPartitionedRegion")
    public PartitionedRegionFactoryBean<String, String> partitionedRegion(Cache cache, RegionAttributes<String, String> regionAttributes) {
        PartitionedRegionFactoryBean<String, String> partitionedRegionFactory = new PartitionedRegionFactoryBean<>();
        partitionedRegionFactory.setAttributes(regionAttributes);
        partitionedRegionFactory.setClose(true);
        partitionedRegionFactory.setCache(cache);
        partitionedRegionFactory.setRegionName("myPartitionedRegion");
        partitionedRegionFactory.setPersistent(false);
        return partitionedRegionFactory;
    }

    @Bean
    public RegionAttributesFactoryBean regionAttributes(PartitionAttributes<String, String> partitionAttributes) {
        RegionAttributesFactoryBean regionAttributesFactory = new RegionAttributesFactoryBean();
        regionAttributesFactory.setKeyConstraint(String.class);
        regionAttributesFactory.setValueConstraint(String.class);
        regionAttributesFactory.setPartitionAttributes(partitionAttributes);
        return regionAttributesFactory;
    }

    @Bean
    public PartitionAttributesFactoryBean partitionAttributes() {
        PartitionAttributesFactoryBean partitionAttributesFactory = new PartitionAttributesFactoryBean();
        partitionAttributesFactory.setRedundantCopies(1);
        return partitionAttributesFactory;
    }

    @Bean
    public GemfireTransactionManager gemfireTransactionManager(Cache cache) {
        return new GemfireTransactionManager(cache);
    }
}

@EnableGemfireRepositoriesを付けてSpring Data Geodeを有効に、@EnableTransactionManagementを付けてトランザクション管理を有効に。

あとはCacheと、今回はPartitionedRegionを定義しています。

テストを書く

それでは、動作確認を兼ねてテストコードを書いて動かしてみます。
src/test/java/org/littlewings/geode/springdata/SpringDataGeodeTest.java

package org.littlewings.geode.springdata;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(GeodeConfig.class)
public class SpringDataGeodeTest {
    @Autowired
    Cache cache;

    @Autowired
    PlatformTransactionManager transactionManager;

    @Before
    public void setUp() {
        Region<String, String> region = cache.getRegion("myPartitionedRegion");
        region.keySet().forEach(key -> region.remove(key));
    }

    @Test
    public void simpleUsage() {
        Region<String, String> region = cache.getRegion("myPartitionedRegion");

        region.put("key", "value");
        assertThat(region.get("key")).isEqualTo("value");
    }

    @Test
    public void txCommit() {
        Region<String, String> region = cache.getRegion("myPartitionedRegion");

        new TransactionTemplate(transactionManager)
                .execute(status -> region.put("key", "value"));

        assertThat(region.get("key")).isEqualTo("value");
    }

    @Test
    public void txRollback() {
        Region<String, String> region = cache.getRegion("myPartitionedRegion");

        new TransactionTemplate(transactionManager).execute(status -> {
            region.put("key", "value");
            status.setRollbackOnly();
            return null;
        });

        assertThat(region.get("key")).isNull();
    }
}

単純なものですが、OKでした。

まとめ

Spring Data Geodeを、ふつうに依存関係に突っ込むというやり方でSpring Bootに投げ込んで使ってみました。

Spring Data Geode用のStarterができるまでは、これでいいかなぁと。

個人的にはSpring Data Geode、Spring Data Gemfireとのバージョンの関連性がよくわからなくなったのと(前は1.7.0.APACHE-GEODE-EA-M1とかでした)、メンテって大変なのかな?(別ブランチっぽいし、Gemfire/Geodeの差とか)とちょっと気になったり。

2016-07-24

RabbitMQのQueueingConsumer(RPC)を試す

RabbitMQのチュートリアルの最後にある、RPCを試してみます。

Remote procedure call (RPC)

タイトルだけ見ると、「キューなのにRPC??」という感じですが、よくよく見るとこんな感じっぽいです。

  • BlockingQueueを内部に持ったConsumerの拡張クラス、QueueingConsumerを受信したメッセージをBlockingQueueに格納
  • Client/ServerともにQueueingConsumer#nextDeliveryを呼び出してメッセージを受信するまでブロックし、受信したら処理開始
  • Server側は、メッセージの送信側にメッセージを送り返す

このチュートリアルのサンプルでは、Client/Serverの両方ともChannel#basicConsumeを使用してキューにConsumerを登録します。

あとはコードを見た方が早いような気がしますが…、一応、チュートリアルの注意としてはRCPを使うことについて、こんな注意事項が書いてあります。

  • 呼び出している処理が実はローカルの関数呼び出しではなく、RPCの場合、しかもプログラマがそれに気付いていない場合は問題が発生するかもしれない
  • システムを複雑化し、デバッグをしづらくする
  • RPCの乱用は、ソフトウェアのシンプルさを失わせメンテナンスの難しいスパゲッティコードを生み出すかもしれない

とまあ、けっこう否定的ですね。使うなら、リモート呼び出しであることをちゃんとドキュメント化しなさいとか、エラーケース(Server側がダウンしている時など)について考慮しなさいとかが書かれています。

前置きはこれくらいにして、書いていってみましょう。

準備

RabbitMQ自体は、起動済みとします。また、アクセスの際には認証を行います(ID/パスワードは、コードに書いています)。

Maven依存関係は、こちら。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJは、テストコード用です。

テストコードの雛形

まずは、テストコードの大枠だけを書いてみます。

import文などを含めて、こんな感じです。
src/test/java/org/littlewings/rabbitmq/rpc/RpcTest.java

package org.littlewings.rabbitmq.rpc;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RpcTest {
    // ここに、テストを書く!
}

Client側

では、まずはClient側のコードを書いてみます。

    public String client(String message) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String replyQueueName = channel.queueDeclare().getQueue();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);

        String uuid = UUID.randomUUID().toString();

        AMQP.BasicProperties properties =
                new AMQP.BasicProperties.Builder()
                        .correlationId(uuid)
                        .replyTo(replyQueueName)
                        .build();

        channel.basicPublish("", "rpc_queue", properties, message.getBytes(StandardCharsets.UTF_8));

        String receivedMessage;

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(uuid)) {
            receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
        } else {
            receivedMessage = null;
        }

        TimeUnit.SECONDS.sleep(5L);

        channel.close();
        connection.close();

        return receivedMessage;
    }

このメソッドは、最終的にはServer側から戻ってきたメッセージを返しています。

キューは簡易的に宣言していますが、ここではQueueingConsumerというクラスが登場します。

        String replyQueueName = channel.queueDeclare().getQueue();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);

このクラスをChannel#basicConsumeで登録しますが、これまではConsumerのサブクラスを作成していましたが、今回はこのQueueingConsumerを直接登録します。
また、Channel#queueDeclare#getQueueの結果をリプライ受信用のキュー名として作成し、このキューに対してQueueingConsumerを紐づけます。

次に、UUIDを作成してBasicPropertiesを組み立て、Channel#basicPublishでメッセージを送信します。

        String uuid = UUID.randomUUID().toString();

        AMQP.BasicProperties properties =
                new AMQP.BasicProperties.Builder()
                        .correlationId(uuid)
                        .replyTo(replyQueueName)
                        .build();

        channel.basicPublish("", "rpc_queue", properties, message.getBytes(StandardCharsets.UTF_8));

この時、先ほど作成したリプライ用のキューの名前をAMQP.BasicProperties.Builder#replyToに設定しています。メッセージの送信自体は、Channel#basicPublishで「rpc_queue」という名前のキューに対して行います。

要するに、送信用(rpc_queue)と受信用(replyQueueName)でキューを分けようということですね。

UUIDはなにに使っているかというと、Correlation Idと呼ばれるものとして扱っています。照合用のIDです。

なにを照合するのかというと、自分が送ったリクエストに対するレスポンスなのか、判断するためのIDを付けようということです。これを、AMQP.BasicProperties.Builder#correlationIdで設定します。

Client側は、この後にQueueingConsumer#nextDeliveryで受信したメッセージの中のCorrelation Idを自分が送ったものと同じかどうか確認します。Correlation Idが違えば、メッセージを無視します(というように実装しています)。

        String receivedMessage;

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(uuid)) {
            receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
        } else {
            receivedMessage = null;
        }

QueueingConsumer#nextDeliveryは、内部的にBlockingQueueで実装されているので、メッセージが届くまではブロックされます。

Server側

続いて、Server側です。

    public void server() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("rpc_queue", false, false, false, null);

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume("rpc_queue", false, consumer);

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

        AMQP.BasicProperties properties = delivery.getProperties();
        AMQP.BasicProperties replyProperties =
                new AMQP.BasicProperties.Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        String replyMessage = "★" + message + "★";

        channel.basicPublish("", properties.getReplyTo(), replyProperties, replyMessage.getBytes(StandardCharsets.UTF_8));

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        TimeUnit.SECONDS.sleep(5);

        channel.close();
        connection.close();
    }

こちらは、「rpc_queue」に対してQueueingConsumerを紐づけます。

        channel.queueDeclare("rpc_queue", false, false, false, null);

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume("rpc_queue", false, consumer);

これで、Client側からのメッセージを取得できるまで、待機します。

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

メッセージが届いたら、QueueingConsumer.DeliveryよりBasicPropertiesを取得し、BasicPropertiesからクライアントが送信してきたCorrelation Idを取得してServer側でもBasicPropertiesを組み立てます。

        AMQP.BasicProperties properties = delivery.getProperties();
        AMQP.BasicProperties replyProperties =
                new AMQP.BasicProperties.Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

そしてリプライ用のメッセージを組み立て(ここでは「★」を付けることにしました)、Client側が送信する時にReplyToに設定したQueueにChannel#basicPublishで送り返します。

        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        String replyMessage = "★" + message + "★";

        channel.basicPublish("", properties.getReplyTo(), replyProperties, replyMessage.getBytes(StandardCharsets.UTF_8));

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Client側は、ここで送り返されたメッセージを、送信時とは別のキューから読み取るというわけですね。

動かしてみる

最後は、テストコードです。

Server側を起動し、Client側がメッセージを送信、返却されたメッセージが送ったメッセージに「★」を加えたものであることが確認できます。

    @Test
    public void rpcCall() throws InterruptedException, IOException, TimeoutException {
        String message = "Hello World";

        ExecutorService es = Executors.newSingleThreadExecutor();
        es.submit(() -> {
            try {
                server();
            } catch (IOException | TimeoutException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        TimeUnit.SECONDS.sleep(2L);

        String replyMessage = client(message);

        assertThat(replyMessage)
                .isEqualTo("★Hello World★");
    }

動かせた感じですね。

まとめ

RabbitMQのチュートリアルの最後、RPCを試してみました。

RPCというより、2つのキューを使った処理でしたが、Client側はProducerでありConsumerでもある、Server側もConsumerでありProducerでもある、そんな構成になっています。

両者がけっこうがっちり噛んでしまうような感じなので、そう使わないような気もしますが…こういう使い方もあるんだなと覚えておきましょう。

これで、RabbitMQのチュートリアルはひととおりおしまいです。

2016-07-17

InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerではない場合)

こちらのエントリの続きです。

InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerの場合) - CLOVER

長さの関係で分割しただけなので、詳細は前のエントリをご覧ください…。

Local Nodeにエントリがない場合

続いて、Local Nodeにエントリがないパターン。この場合は、Cache#get時にもRPCが発生するはずですね。

こちらのパターンは、キーは「key4」が選ばれました。結果はこちら。

===== NOT ASSIGNED KEY = key4 START =====
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, entry
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, entry
              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [EntryFactory] EntryFactoryImpl:getFromContext, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, exit
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] ClusteredGetCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitGetCacheEntryCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitGetCacheEntryCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitGetCacheEntryCommand, entry
                        [Interceptor] StateTransferInterceptor:visitGetCacheEntryCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitGetCacheEntryCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitGetCacheEntryCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForReading, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:get, entry
                                  [DataContainer] DefaultDataContainer:get, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForReading, exit
                              [Interceptor] NonTxDistributionInterceptor:visitGetCacheEntryCommand, entry
                                [Interceptor] CallInterceptor:visitGetCacheEntryCommand, entry
                                  [Command] GetCacheEntryCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] GetCacheEntryCommand:perform, exit
                                [Interceptor] CallInterceptor:visitGetCacheEntryCommand, exit
                              [Interceptor] NonTxDistributionInterceptor:visitGetCacheEntryCommand, exit
                            [Interceptor] EntryWrappingInterceptor:visitGetCacheEntryCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitGetCacheEntryCommand, exit
                        [Interceptor] StateTransferInterceptor:visitGetCacheEntryCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitGetCacheEntryCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitGetCacheEntryCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitGetCacheEntryCommand, exit
                [Command] ClusteredGetCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
              [EntryFactory] EntryFactoryImpl:wrapExternalEntry, entry
                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
              [EntryFactory] EntryFactoryImpl:wrapExternalEntry, exit
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, entry
                [Command] GetKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] GetKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, exit
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, exit
===== NOT ASSIGNED KEY = key4 END =====

エントリがLocal Node(InvocationContextおよびDataContainer)にないので、RPCが発生していますね。

で、RPCを投げた後ですが中身を見てみると、ClusterGetCommand#performから先はLocal Nodeにデータがあった時と似たような動きをしていますね。
そりゃあそうだ、という感じかもですが。

              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] ClusteredGetCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitGetCacheEntryCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitGetCacheEntryCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitGetCacheEntryCommand, entry
                        [Interceptor] StateTransferInterceptor:visitGetCacheEntryCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitGetCacheEntryCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitGetCacheEntryCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForReading, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:get, entry
                                  [DataContainer] DefaultDataContainer:get, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForReading, exit
                              [Interceptor] NonTxDistributionInterceptor:visitGetCacheEntryCommand, entry
                                [Interceptor] CallInterceptor:visitGetCacheEntryCommand, entry
                                  [Command] GetCacheEntryCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] GetCacheEntryCommand:perform, exit
                                [Interceptor] CallInterceptor:visitGetCacheEntryCommand, exit
                              [Interceptor] NonTxDistributionInterceptor:visitGetCacheEntryCommand, exit
                            [Interceptor] EntryWrappingInterceptor:visitGetCacheEntryCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitGetCacheEntryCommand, exit
                        [Interceptor] StateTransferInterceptor:visitGetCacheEntryCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitGetCacheEntryCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitGetCacheEntryCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitGetCacheEntryCommand, exit
                [Command] ClusteredGetCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit

では、put時の挙動。

===== PUT KEY = key4 START
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] SingleRpcCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                  [Command] PutKeyValueCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] PutKeyValueCommand:perform, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                  [Command] SingleRpcCommand:perform, entry
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                                    [DataContainer] DefaultDataContainer:peek, entry
                                                    [DataContainer] DefaultDataContainer:peek, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                                    [Command] PutKeyValueCommand:perform, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                    [Command] PutKeyValueCommand:perform, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                                [DataContainer] DefaultDataContainer:peek, entry
                                                [DataContainer] DefaultDataContainer:peek, exit
                                                [DataContainer] DefaultDataContainer:put, entry
                                                [DataContainer] DefaultDataContainer:put, exit
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                                  [Command] SingleRpcCommand:perform, exit
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                              [DataContainer] DefaultDataContainer:peek, entry
                              [DataContainer] DefaultDataContainer:peek, exit
                              [DataContainer] DefaultDataContainer:put, entry
                              [DataContainer] DefaultDataContainer:put, exit
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                [Command] SingleRpcCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== PUT KEY = key4 END

Local Nodeにデータがあっても、Primary Ownerでない時と似たような感じになっていますね。

更新時。

===== UPDATE KEY = key4 START =====
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] SingleRpcCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                  [Command] PutKeyValueCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] PutKeyValueCommand:perform, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                  [Command] SingleRpcCommand:perform, entry
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                                    [DataContainer] DefaultDataContainer:peek, entry
                                                    [DataContainer] DefaultDataContainer:peek, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                                    [Command] PutKeyValueCommand:perform, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                    [Command] PutKeyValueCommand:perform, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                                [DataContainer] DefaultDataContainer:peek, entry
                                                [DataContainer] DefaultDataContainer:peek, exit
                                                [DataContainer] DefaultDataContainer:put, entry
                                                [DataContainer] DefaultDataContainer:put, exit
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                                  [Command] SingleRpcCommand:perform, exit
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                              [DataContainer] DefaultDataContainer:peek, entry
                              [DataContainer] DefaultDataContainer:peek, exit
                              [DataContainer] DefaultDataContainer:put, entry
                              [DataContainer] DefaultDataContainer:put, exit
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                [Command] SingleRpcCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== UPDATE KEY = key4 END =====

こちらも、そう変わらず。

まとめ

InfinispanのDistributed Cacheにおける、putとgetの挙動をトレースしてみました。

Interceptor、CommandやInvocationContext、DataContainerといったクラスの関係や、RPCの発生などについて基礎的なところは見れた感じですが、Commandのバリエーションはもっと多いですし、トランザクションの有無などで挙動が大きく変わりそうなので、ホント初歩ですよねぇと。

ただ、これまでボヤッとしか見ていなかった部分をそれなりにマジメに追ってみたので、その点は勉強になりました。

まー、このあたりのコード、Infinispan 9.0.0でガッツリ変わりそうですけどね!

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-trace-distributed-putget

2016-07-16

RabbitMQのTopicsを試してみる

RabbitMQのチュートリアルで遊んでみようなシリーズ、今度はTopicsを扱います。

Topics

このチュートリアルの前には、Routingというものを扱っているのですが、そこでは複数の条件で振り分けることができないという制限があったのでこれを改善するというものらしいです。

リンク先のチュートリアルではTopicと呼ばれるものを使用して、UnixLinuxにおけるsyslogライクなサンプルを試しています。

Topicとは

Topic Exchangeに送信されたメッセージは、ドット(.)で区切られた、ワードのリストであるrouting keyを構成することができます。ワード自体はなんでも構いませんが、通常はメッセージに対して意味、特徴のあるものを使用するでしょう。例えば、"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"など。

routing keyとして多くのワードを含むことができますが、その長さには255バイトまでの制限があります。

binding keyについても、同じ形式である必要があります。Topic Exchangeは、directと似通ったロジックを背後に備えています。特定のrouting keyと共に送信されたメッセージは、すべてのQueueに対してbinding keyにマッチするか確認が行われます。

しかし、binding keyには次の2つの重要な、特別な形式があります。

  • *(star) … ひとつのワードを代替可能
  • #(hash) … ゼロ、または複数のワードを代替可能

RabbitMQのチュートリアルでの説明は、これを

"<speed>.<colour>.<species>"

を例に解説しています。

このあたりは、このエントリではコードでパターンを示しながら書いていくとしましょう。

それでは、実際に使っていってみます。

準備

まず、RabbitMQ自体はすでにインストール、起動済みとします。また、認証が行われる前提のコードになっています。

実装を行うためのMaven依存関係は、以下の通り。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.1</version>
            <scope>test</scope>
        </dependency>

JUnitとAssertJは、テストコード用です。

テストコードの雛形

まず、テストコードの雛形部分を書いておきます。
src/test/java/org/littlewings/rabbitmq/topics/TopicsTest.java

package org.littlewings.rabbitmq.topics;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class TopicsTest {
    // ここに、テストを書く!
}

Producer側を書く

最初に、メッセージを送信するProducer側を書いていきましょう。

作成したコードは、こちら。

    public void publish(String routingKey, String... messages) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "topic");

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        channel.close();
        connection.close();
    }

routing keyは、メソッド呼び出し時に指定できるようにしました。

Channel#exchangeDeclareを呼び出す際に、第2引数(type)を「topic」にしているところがポイントです。

        channel.exchangeDeclare("messageExchange", "topic");

あとは、routing keyに沿ってChannel#publishします。

                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));

Consumer側

続いて、Consumer側を書いてみます。

作成したコードは、こちら。

    public List<String> subscribe(String... routingKeys) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "topic");
        String queueName = channel.queueDeclare().getQueue();

        Arrays.asList(routingKeys).forEach(routingKey -> {
            try {
                channel.queueBind(queueName, "messageExchange", routingKey);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) {
                receivedMessages.add(new String(bytes, StandardCharsets.UTF_8));
            }
        };

        channel.basicConsume(queueName, true, consumer);

        TimeUnit.SECONDS.sleep(5L);

        return receivedMessages;
    }

こちらも、routing key(というかbinding key)は複数受け取れるようにメソッドを宣言しています。

Channel#exchangeDeclareで「topic」を指定しているところは、Producer側と同じ。

        channel.exchangeDeclare("messageExchange", "topic");

あとは、Queueにバインドして送信されてくるメッセージを待ちます。

確認

それでは、これらのコードでTopicを使って確認してみましょう。

特に、「*」と「#」が特殊な意味を持つということでしたね。このあたりを踏まえて。

では、まず「*」入りのものから。

    @Test
    public void topicsAsterisk() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> infoLevelMessages = Arrays.asList("Connected target host.", "Disconnected target host.");
        List<String> errorLevelMessages = Arrays.asList("Connection timeout.", "Read timeout.");

        List<String> allLevelMessages = new ArrayList<>(infoLevelMessages);
        allLevelMessages.addAll(errorLevelMessages);

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(8);
        Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages"));
        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*"));
        Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*"));
        Future<List<String>> f4 = es.submit(() -> subscribe("network.*"));
        Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*"));
        Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*"));
        Future<List<String>> f7 = es.submit(() -> subscribe("*.*"));
        Future<List<String>> f8 = es.submit(() -> subscribe("*"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()]));
        publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f2.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f3.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f4.get())
                .isEmpty();
        assertThat(f5.get())
                .containsExactlyElementsOf(infoLevelMessages);
        assertThat(f6.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f7.get())
                .isEmpty();
        assertThat(f8.get())
                .isEmpty();

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

Consumer側を複数定義して、syslogっぽいワードで紐づけてみます。まったく「*」を含まない具体的なものから、一部のワードを「*」としたもの、3つのワードで今回は区切られていますが、数が足りないものなどなど。

        Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages"));
        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*"));
        Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*"));
        Future<List<String>> f4 = es.submit(() -> subscribe("network.*"));
        Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*"));
        Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*"));
        Future<List<String>> f7 = es.submit(() -> subscribe("*.*"));
        Future<List<String>> f8 = es.submit(() -> subscribe("*"));

で、メッセージ送信。キーはそれぞれ、「network.info.messages」と「network.error.messages」としました。INFOレベルとERRORレベル的な。

        publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()]));
        publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()]));

先ほどSubscribeの設定をした、どのConsumerがメッセージを受け取れているか、というテストになります。

結果は、こんな感じに。

        assertThat(f1.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f2.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f3.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f4.get())
                .isEmpty();
        assertThat(f5.get())
                .containsExactlyElementsOf(infoLevelMessages);
        assertThat(f6.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f7.get())
                .isEmpty();
        assertThat(f8.get())
                .isEmpty();

変数名の例が、ちょっと良くなかったかもですが…。

こういうのは「*」がワードを補完するのですべてのメッセージを受け取れていますが、

        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*"));

こういう「*」が含まれていても「.」区切りの数が足りないものは、メッセージを受信できていません。

        Future<List<String>> f4 = es.submit(() -> subscribe("network.*"));
        Future<List<String>> f7 = es.submit(() -> subscribe("*.*"));
        Future<List<String>> f8 = es.submit(() -> subscribe("*"));

なるほど、確かに

  • *(star) … ひとつのワードを代替可能

ひとつのワードを代替可能、ですね。

また、こういうのはすべてのメッセージを受信できていますし、

        Future<List<String>> f3 = es.submit(() -> subscribe("network.*.*"));
        Future<List<String>> f6 = es.submit(() -> subscribe("*.*.*"));

他のパターンだと、より具体的な部分でマッチしたメッセージだけが受信できていることが確認できます。

        Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages"));
        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.*"));
        Future<List<String>> f5 = es.submit(() -> subscribe("*.info.*"));

上2つはERRORレベルのみ、下はINFOレベルのみですね。

では、続いて今度は「#」を使ってみます。

    @Test
    public void topicsHash() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> infoLevelMessages = Arrays.asList("Connected target host.", "Disconnected target host.");
        List<String> errorLevelMessages = Arrays.asList("Connection timeout.", "Read timeout.");

        List<String> allLevelMessages = new ArrayList<>(infoLevelMessages);
        allLevelMessages.addAll(errorLevelMessages);

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(8);
        Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages"));
        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.#"));
        Future<List<String>> f3 = es.submit(() -> subscribe("network.#.#"));
        Future<List<String>> f4 = es.submit(() -> subscribe("network.#"));
        Future<List<String>> f5 = es.submit(() -> subscribe("#.info.#"));
        Future<List<String>> f6 = es.submit(() -> subscribe("#.#.#"));
        Future<List<String>> f7 = es.submit(() -> subscribe("#.#"));
        Future<List<String>> f8 = es.submit(() -> subscribe("#"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("network.info.messages", infoLevelMessages.toArray(new String[infoLevelMessages.size()]));
        publish("network.error.messages", errorLevelMessages.toArray(new String[errorLevelMessages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f2.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f3.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f4.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f5.get())
                .containsExactlyElementsOf(infoLevelMessages);
        assertThat(f6.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f7.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f8.get())
                .containsExactlyElementsOf(allLevelMessages);

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

Subscribeするパターンは先ほどとほぼ同じですが、「*」だった部分を「#」に変えています。

        Future<List<String>> f1 = es.submit(() -> subscribe("network.error.messages"));
        Future<List<String>> f2 = es.submit(() -> subscribe("network.error.#"));
        Future<List<String>> f3 = es.submit(() -> subscribe("network.#.#"));
        Future<List<String>> f4 = es.submit(() -> subscribe("network.#"));
        Future<List<String>> f5 = es.submit(() -> subscribe("#.info.#"));
        Future<List<String>> f6 = es.submit(() -> subscribe("#.#.#"));
        Future<List<String>> f7 = es.submit(() -> subscribe("#.#"));
        Future<List<String>> f8 = es.submit(() -> subscribe("#"));

すると、結果が先ほどとは変わってメッセージを受信しないConsumerがいなくなりました。

        assertThat(f1.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f2.get())
                .containsExactlyElementsOf(errorLevelMessages);
        assertThat(f3.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f4.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f5.get())
                .containsExactlyElementsOf(infoLevelMessages);
        assertThat(f6.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f7.get())
                .containsExactlyElementsOf(allLevelMessages);
        assertThat(f8.get())
                .containsExactlyElementsOf(allLevelMessages);

「*」を使っていた時は、「.」の部分を含めたワードの数が合わないものはメッセージを受信できていませんでしたが、「#」の場合は複数のワードとしても機能するからですね。

  • #(hash) … ゼロ、または複数のワードを代替可能

なるほど。

まとめ

RabbitMQのTopic Exchangeを使って、Routingのチュートリアルよりも複雑なメッセージの送受信の振り分けの例を書いてみました。

実際に使って書いてみると、動きがわかってなかなか面白いですね。

2016-07-15

InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerの場合)

少し前に、こういうエントリを書きました。

InfinispanのInterceptorの組み合わせを、Cacheの種類ごとに見てみる - CLOVER

Infinispanの各Cacheに対して、putやgetをした時にどういうInterceptorやCommmandが実行されるのか、見てみようというエントリです。

今回は、もう少し絞ってDistributed Cache(クラスタ構成)でのputやgetのトレースを、もうちょっと詳細に追ってみたいと思います。

ここでの目的は、以下です。

  • Distributed Cacheのput/getのトレース
  • 操作しているLocal NodeがキーのOwnerでなかった時に、どのような挙動をするのか見てみる
  • RPCのタイミングを見る
  • シリアライズ/デシリアライズのタイミングを見る
  • 末端でデータを保持している、DataContainerへのデータの取得/保存のタイミングを見る

このあたりを追うために、今回もBytemanのルールを仕込んでInfinispanをトレースしてみたいと思います。
※どうも書いた内容が長すぎたようなので、Local Nodeにデータがないパターンを別の記事にします

って、頑張って追うんですけど、このあたりの構造(特にInterceptorまわり)、Infinispan 9.0.0でガッツリ変わりそうですけどね!!

準備

まずは、準備作業。

ビルド定義は、このように。
build.sbt

name := "embedded-trace-distributed-putget"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

javaOptions in Test += "-javaagent:" + System.getenv("BYTEMAN_HOME") + "/lib/byteman.jar=script:trace.btm"

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "8.2.3.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

Java実行時に、Bytemanのルールを実行するようにしています。

Infinispanの設定は、以下の通り。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container default-cache="distributedCache">
        <jmx duplicate-domains="true"/>
        <transport cluster="cluster" stack="udp"/>

        <distributed-cache name="distributedCache"/>
    </cache-container>
</infinispan>

シンプルに、Distributed Cacheをひとつだけ定義しています。

JGroupsの設定は、端折ります。

テストコード

動作確認のためのテストコードは、以下のようなものを作成しました。
src/test/scala/org/littlewings/infinispan/tracedistributedcache/TraceDistributedCachePutGetSpec.scala

package org.littlewings.infinispan.tracedistributedcache

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager
import org.scalatest.{FunSpec, Matchers}

class TraceDistributedCachePutGetSpec extends FunSpec with Matchers {
  describe("Trace DistributedCache Spec") {
    it("trace, self assigned key, get") {
      withCache[String, String]("distributedCache", 3) { cache =>
        val keyRange = 1 to 10
        keyRange.foreach { i =>
          println(s"===== PUT KEY = key$i START")
          cache.put(s"key$i", s"value$i")
          println(s"===== PUT KEY = key$i END")
        }

        val keys = keyRange.map(i => s"key$i").toVector

        val dm = cache.getAdvancedCache.getDistributionManager
        val self = cache.getAdvancedCache.getRpcManager.getAddress

        val pickAssignedSelfKey = keys.find(key => dm.getLocality(key).isLocal)

        pickAssignedSelfKey should not be(None)

        pickAssignedSelfKey.foreach { key =>
          println(s"===== ASSIGNED KEY = $key START =====")
          cache.get(key) should be(key.replace("key", "value"))
          println(s"===== ASSIGNED KEY = $key END =====")

          println(s"===== RE LOOKUP KEY = $key START =====")
          cache.get(key) should be(key.replace("key", "value"))
          println(s"===== RE LOOKUP KEY = $key END =====")

          val newValue = cache.get(key) + "-new"

          println(s"===== UPDATE KEY = $key START =====")
          cache.put(key, newValue)
          println(s"===== UPDATE KEY = $key END =====")
        }
      }
    }

    it("trace, not self assigned key, get") {
      withCache[String, String]("distributedCache", 3) { cache =>
        val keyRange = 1 to 10
        keyRange.foreach { i =>
          println(s"===== PUT KEY = key$i START")
          cache.put(s"key$i", s"value$i")
          println(s"===== PUT KEY = key$i END")
        }

        val keys = keyRange.map(i => s"key$i").toVector

        val dm = cache.getAdvancedCache.getDistributionManager
        val self = cache.getAdvancedCache.getRpcManager.getAddress

        val pickNotAssignedSelfKey = keys.find(key => !dm.getLocality(key).isLocal)

        pickNotAssignedSelfKey should not be(None)

        pickNotAssignedSelfKey.foreach { key =>
          println(s"===== NOT ASSIGNED KEY = $key START =====")
          cache.get(key) should be(key.replace("key", "value"))
          println(s"===== NOT ASSIGNED KEY = $key END =====")

          println(s"===== RE LOOKUP KEY = $key START =====")
          cache.get(key) should be(key.replace("key", "value"))
          println(s"===== RE LOOKUP KEY = $key END =====")

          val newValue = cache.get(key) + "-new"

          println(s"===== UPDATE KEY = $key START =====")
          cache.put(key, newValue)
          println(s"===== UPDATE KEY = $key END =====")
        }
      }
    }
  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)
      fun(cache)
    } finally {
      managers.foreach(_.getCache[K, V](cacheName).stop())
      managers.foreach(_.stop())
    }
  }
}

各テスト実行時に、InfinispanのNodeを3つ起動し、データを10件登録します。続いて、テストケースに応じてLocal Nodeにデータがあるもの、ないものをそれぞれgetします。そして、最後にすでに存在するキーに対応するエントリを更新します。

この時に、Bytemanのルールでトレースしてみて、挙動を見てみようという作戦です。

作成したBytemanルールとトレース対象

最終的に作成したルールは、こちらになります。
※長いのでリンクで

https://github.com/kazuhira-r/infinispan-getting-started/blob/master/embedded-trace-distributed-putget/trace.btm

何を追っているかというと

  • DataContainerへのput/get/peek
  • EntryLookup(というかInvocationContext)からのエントリの取得/登録(lookupEntry/putLookedUpEntry)
  • EntryFactoryからのエントリの取得/登録、DataContainerへのアクセスのタイミング(InvocationContextとDataContainerへのアクセスをラップ)
  • Marshallerを使用したシリアライズ/デシリアライズ
  • RpcManager、BaseDistributionInterceptorのサブクラスによる、RPC実行のタイミング
  • Visitorインターフェースの実装クラスによる、各種visitメソッドの実行
  • ReplicableCommandインターフェースの実装クラスによる、各種コマンドの実行

という感じです。

と、言葉だけ並べてもよくわからない感じなので、もう少し言葉を見ておきましょう。

DataContainerというのは、Infinispanの内部でデータを保持しているコンテナです。DefaultDataContainerという実装クラスがあり、

   private final ConcurrentMap<K, InternalCacheEntry<K, V>> entries;

ここにCacheのエントリが保持されています。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/container/DefaultDataContainer.java#L61

DataContainerはMapライクな操作を持つくらいのものなので、今回はput、get、peekメソッドにトレースを入れてデータの取得や登録のタイミングを追います。

次に、InvocationContext(とEntryLookup、EntryFactory)です。

InvocationContextというのは、InfinispanのInterceptorで各種visitメソッドを実行していく際、Interceptorをチェインしていく時に引き継がれていくインスタンスです。最終的に、Interceptorのチェインの末端にあるCommandの実装まで渡されます。

InvocationContext自体は、Cacheの実装クラスによってコマンドの呼び出し時にどのような性格のInvocationContextを生成するのかを決定します。

put。

   @SuppressWarnings("unchecked")
   final V put(K key, V value, Metadata metadata,
         EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) {
      assertKeyValueNotNull(key, value);
      InvocationContext ctx = getInvocationContextWithImplicitTransaction(false, explicitClassLoader, 1);
      return putInternal(key, value, metadata, explicitFlags, ctx);
   }

get。

   @SuppressWarnings("unchecked")
   final V get(Object key, EnumSet<Flag> explicitFlags, ClassLoader explicitClassLoader) {
      assertKeyNotNull(key);
      InvocationContext ctx = getInvocationContextForRead(explicitClassLoader, 1);
      GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(key, explicitFlags);
      return (V) invoker.invoke(ctx, command);
   }

https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L1110
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L735
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L409
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java#L742

Write用、Read用といった感じで。

で、末端で実行されるCommand、例えばCache#get時に呼び出されるGetKeyValueCommandというクラスは、実はDataContainerへのアクセスは行いません。このCommandに到達するまでに、InterceotprのチェインのどこかでInvocationContextにエントリが設定されており、それを返すといった処理を行います。

GetCommand#perform。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/commands/read/GetKeyValueCommand.java#L53

   @Override
   public Object perform(InvocationContext ctx) throws Throwable {
      CacheEntry entry = ctx.lookupEntry(key);

PutKeyValueCommand#perform。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java#L98

   @Override
   public Object perform(InvocationContext ctx) throws Throwable {
      // It's not worth looking up the entry if we're never going to apply the change.
      if (valueMatcher == ValueMatcher.MATCH_NEVER) {
         successful = false;
         return null;
      }
      MVCCEntry e = (MVCCEntry) ctx.lookupEntry(key);

この過程で、誰がInvocationContext(EntryLookup)にエントリを設定するのかを追う、という感じですね。

Marshallerについては、単純にシリアライズ/デシリアライズのタイミングを見てみるだけです。

あとは、気になるところとしてRPCの実行タイミングを見るために、RpcManagerとBaseDistributionInterceptorに対してトレースを仕込みます。Interceptorでは、他Nodeへのアクセスの際には、これらのクラスの機能を使っているようなので。これで、ネットワーク通信のタイミングを見てみようという魂胆です。

実行してみる

それでは、実行して結果をトレースしてみましょう。

Local Nodeにエントリがある場合

コード上、最初にエントリをputしてからgetするようになっているのですが、結果はgetから見てみましょう。

getのトレース結果。
※一部端折っているのと、見やすいようにインデントを入れています

今回は、Local Nodeに割り当てられたキーとして、「key1」を選択したようです。

===== ASSIGNED KEY = key1 START =====
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, entry
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, entry
              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [EntryFactory] EntryFactoryImpl:getFromContext, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                [DataContainer] DefaultDataContainer:get, entry
                [DataContainer] DefaultDataContainer:get, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, exit
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, entry
                [Command] GetKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] GetKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitGetKeyValueCommand, exit
            [Interceptor] NonTxDistributionInterceptor:visitGetKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitGetKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitGetKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitGetKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitGetKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitGetKeyValueCommand, exit
===== ASSIGNED KEY = key1 END =====

これを見ると、EntryWrappingInterceptor:visitGetKeyValueCommandで、InvocationContextにエントリがなかったら、DataContainerにエントリを取りに行っている感じのように見えますね。そして、取得したエントリInvocationContext#putLookedUpEntryで登録する、と。

          [Interceptor] EntryWrappingInterceptor:visitGetKeyValueCommand, entry
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, entry
              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [EntryFactory] EntryFactoryImpl:getFromContext, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                [DataContainer] DefaultDataContainer:get, entry
                [DataContainer] DefaultDataContainer:get, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
            [EntryFactory] EntryFactoryImpl:wrapEntryForReading, exit

シリアライズは出てきませんね、と。

なので、GetKeyValueCommand:performでは単純にInvocationContextから結果を取得しているだけです。

                [Command] GetKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] GetKeyValueCommand:perform, exit

では、登録時(put)はどうでしょう。

こんな感じになりました。

===== PUT KEY = key1 START
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] SingleRpcCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                  [Command] PutKeyValueCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] PutKeyValueCommand:perform, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                  [Command] SingleRpcCommand:perform, entry
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                                    [DataContainer] DefaultDataContainer:peek, entry
                                                    [DataContainer] DefaultDataContainer:peek, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                                    [Command] PutKeyValueCommand:perform, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                    [Command] PutKeyValueCommand:perform, exit
                                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                                [DataContainer] DefaultDataContainer:peek, entry
                                                [DataContainer] DefaultDataContainer:peek, exit
                                                [DataContainer] DefaultDataContainer:put, entry
                                                [DataContainer] DefaultDataContainer:put, exit
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                                  [Command] SingleRpcCommand:perform, exit
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                              [DataContainer] DefaultDataContainer:peek, entry
                              [DataContainer] DefaultDataContainer:peek, exit
                              [DataContainer] DefaultDataContainer:put, entry
                              [DataContainer] DefaultDataContainer:put, exit
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                [Command] SingleRpcCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== PUT KEY = key1 END

長!!

なんか、2回RPCが投げられていますね。はて?

RPCの実行を指示しているのは、このあたりです。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java#L279

RPCの実行前後には、Marshallerが呼び出され、シリアライズ/デシリアライズが行われるっぽい感じのことが読み取れます。

また、トレース結果からはわからないのですが、直接DataContainer#putを呼び出しているのは今回はReadCommittedEntryのようです。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java#L168

そして、コミット指示をしているのはEntryWrappingInterceptorですと。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/interceptors/EntryWrappingInterceptor.java#L530

あくまでPutKeyValueCommandはInvocationContextを操作しているだけで、DataContainerへの反映はInterceptorみたいですね。

                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                              [DataContainer] DefaultDataContainer:peek, entry
                              [DataContainer] DefaultDataContainer:peek, exit
                              [DataContainer] DefaultDataContainer:put, entry
                              [DataContainer] DefaultDataContainer:put, exit
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit

InvocationContextへのエントリの登録は、EntryWrappingInterceptorでの処理内で行われる感じですね。

                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit

では、エントリを更新してみます。

===== UPDATE KEY = key1 START =====
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] SingleRpcCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                  [Command] PutKeyValueCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] PutKeyValueCommand:perform, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [RpcManager] RpcManagerImpl:invokeRemotely, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                  [Command] SingleRpcCommand:perform, entry
                                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                                    [DataContainer] DefaultDataContainer:peek, entry
                                                    [DataContainer] DefaultDataContainer:peek, exit
                                                  [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                                                  [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                                                  [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                    [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                                      [Command] PutKeyValueCommand:perform, entry
                                                        [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                                        [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                                      [Command] PutKeyValueCommand:perform, exit
                                                    [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                                  [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                                  [DataContainer] DefaultDataContainer:peek, entry
                                                  [DataContainer] DefaultDataContainer:peek, exit
                                                  [DataContainer] DefaultDataContainer:put, entry
                                                  [DataContainer] DefaultDataContainer:put, exit
                                                [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                                              [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                                            [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                                          [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                                        [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                                      [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                                    [Command] SingleRpcCommand:perform, exit
                                    [Marshaller] GlobalMarshaller:objectToBuffer, entry
                                    [Marshaller] GlobalMarshaller:objectToBuffer, exit
                                    [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                                    [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                                  [RpcManager] RpcManagerImpl:invokeRemotely, exit
                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                [DataContainer] DefaultDataContainer:peek, entry
                                [DataContainer] DefaultDataContainer:peek, exit
                                [DataContainer] DefaultDataContainer:put, entry
                              [DataContainer] DefaultDataContainer:put, exit
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                [Command] SingleRpcCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== UPDATE KEY = key1 END =====

あれ?そんなに変わらない?

どうも、エントリのOwnerがPrimaryかそうでないかで、挙動が変わりそうな気がします。

そこで、キーの抽出条件を変更。

        // val pickAssignedSelfKey = keys.find(key => dm.getLocality(key).isLocal)
        val pickAssignedSelfKey = keys.find(key => dm.getLocality(key).isLocal && dm.getPrimaryLocation(key) == self)

これで、再度実行してみます。getの時の挙動は変わらないので、初回のputと更新時を追ってみます。
※今度はキーが「key2」に変わりました…

登録時。

===== PUT KEY = key2 START
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [EntryFactory] EntryFactoryImpl:getFromContext, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                [DataContainer] DefaultDataContainer:peek, entry
                [DataContainer] DefaultDataContainer:peek, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
            [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
            [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
          [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
          [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
            [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
            [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
            [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
              [Command] PutKeyValueCommand:perform, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Command] PutKeyValueCommand:perform, exit
            [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
            [RpcManager] RpcManagerImpl:invokeRemotely, entry
              [Marshaller] GlobalMarshaller:objectToBuffer, entry
              [Marshaller] GlobalMarshaller:objectToBuffer, exit
              [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
              [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [Command] SingleRpcCommand:perform, entry
                [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                            [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                              [EntryFactory] EntryFactoryImpl:getFromContext, exit
                              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                [DataContainer] DefaultDataContainer:peek, entry
                                [DataContainer] DefaultDataContainer:peek, exit
                              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                            [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                  [Command] PutKeyValueCommand:perform, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                    [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Command] PutKeyValueCommand:perform, exit
                                [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                [DataContainer] DefaultDataContainer:peek, entry
                                [DataContainer] DefaultDataContainer:peek, exit
                                [DataContainer] DefaultDataContainer:put, entry
                                [DataContainer] DefaultDataContainer:put, exit
                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                  [Command] SingleRpcCommand:perform, exit
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
            [DataContainer] DefaultDataContainer:peek, entry
            [DataContainer] DefaultDataContainer:peek, exit
            [DataContainer] DefaultDataContainer:put, entry
            [DataContainer] DefaultDataContainer:put, exit
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== PUT KEY = key2 END

なんか、だいぶすっきりしてDataContainerへのアクセスが早い段階で登場するようになりましたね?

どうなってるんでしょう?ってことで、もう少し追ってみると、やっぱりエントリのOwnerがPrimaryかそうでないかで挙動が変わるようです。
https://github.com/infinispan/infinispan/blob/8.2.3.Final/core/src/main/java/org/infinispan/interceptors/EntryWrappingInterceptor.java#L234-L238

         if (isUsingLockDelegation || isTransactional) {
            result = cdl.localNodeIsPrimaryOwner(key) || (cdl.localNodeIsOwner(key) && !ctx.isOriginLocal());
         } else {
            result = cdl.localNodeIsOwner(key);
         }

なるほど。ということは、先ほどの例だと、Local Nodeにバックアップがあった状態で実行していたということですね。

更新時。

===== UPDATE KEY = key2 START =====
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
      [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
        [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
          [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
            [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
              [EntryFactory] EntryFactoryImpl:getFromContext, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [EntryFactory] EntryFactoryImpl:getFromContext, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                [DataContainer] DefaultDataContainer:peek, entry
                [DataContainer] DefaultDataContainer:peek, exit
              [EntryFactory] EntryFactoryImpl:getFromContainer, exit
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
            [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
            [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
              [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                [Command] PutKeyValueCommand:perform, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                [Command] PutKeyValueCommand:perform, exit
              [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
              [RpcManager] RpcManagerImpl:invokeRemotely, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, entry
                [Marshaller] GlobalMarshaller:objectToBuffer, exit
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [Command] SingleRpcCommand:perform, entry
                  [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, entry
                    [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, entry
                      [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, entry
                        [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, entry
                          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, entry
                            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, entry
                              [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, entry
                                [EntryFactory] EntryFactoryImpl:getFromContext, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                [EntryFactory] EntryFactoryImpl:getFromContext, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, entry
                                  [DataContainer] DefaultDataContainer:peek, entry
                                  [DataContainer] DefaultDataContainer:peek, exit
                                [EntryFactory] EntryFactoryImpl:getFromContainer, exit
                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:putLookedUpEntry, exit
                                [EntryFactory] EntryFactoryImpl:wrapEntryForWriting, exit
                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                  [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, entry
                                    [Command] PutKeyValueCommand:perform, entry
                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, entry
                                      [EntryLookup] SingleKeyNonTxInvocationContext:lookupEntry, exit
                                    [Command] PutKeyValueCommand:perform, exit
                                  [Interceptor] CallInterceptor:visitPutKeyValueCommand, exit
                                [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
                                [DataContainer] DefaultDataContainer:peek, entry
                                [DataContainer] DefaultDataContainer:peek, exit
                                [DataContainer] DefaultDataContainer:put, entry
                                [DataContainer] DefaultDataContainer:put, exit
                              [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
                            [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
                          [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
                        [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
                      [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
                    [Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
                  [Command] SingleRpcCommand:perform, exit
                  [Marshaller] GlobalMarshaller:objectToBuffer, entry
                  [Marshaller] GlobalMarshaller:objectToBuffer, exit
                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, entry
                  [Marshaller] GlobalMarshaller:objectFromByteBuffer, exit
                [RpcManager] RpcManagerImpl:invokeRemotely, exit
              [Interceptor] NonTxDistributionInterceptor:visitPutKeyValueCommand, exit
              [DataContainer] DefaultDataContainer:peek, entry
              [DataContainer] DefaultDataContainer:peek, exit
              [DataContainer] DefaultDataContainer:put, entry
              [DataContainer] DefaultDataContainer:put, exit
            [Interceptor] EntryWrappingInterceptor:visitPutKeyValueCommand, exit
          [Interceptor] NonTransactionalLockingInterceptor:visitPutKeyValueCommand, exit
       [Interceptor] StateTransferInterceptor:visitPutKeyValueCommand, exit
    [Interceptor] CacheMgmtInterceptor:visitPutKeyValueCommand, exit
  [Interceptor] InvocationContextInterceptor:visitPutKeyValueCommand, exit
[Interceptor] DistributionBulkInterceptor:visitPutKeyValueCommand, exit
===== UPDATE KEY = key2 END =====

更新時も、登録時とそう変わりませんね。

Primaryかどうかで、けっこう変わるものなんですねぇ…。

いったんここまで

ここらで長さが限界のようなので、続きは次の記事へ。
InfinispanのDistributed Cacheのput/getをトレースする(Local NodeがOnwerではない場合) - CLOVER

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-trace-distributed-putget