Hatena::ブログ(Diary)

CLOVER

2018-05-20

jcmdメモ

そろそろいい加減に、jpsから各種のJDK付属のツールからjcmdに慣れていこうかなと思い、個人的にメモとしてまとめることにしました。

既存のコマンドと、対比させる形で書いていきます。

参考

参考にしたのは、このあたり。

jcmdユーティリティ

jcmd

jcmdを使ったプロセスの指定方法は、これだけあるんですね。

プロセス識別子(pid)またはメイン・クラス(main-class)を1番目の引数として指定すると、jcmdは診断コマンド要求を、指定した識別子を持つJavaプロセスまたは指定したメインクラス名を持つすべてのJavaプロセスに送信します。また、プロセス識別子として0を指定すると、診断コマンド要求を使用可能なすべてのJavaプロセスに送信できます。診断コマンド要求として次のいずれかを使用します。

https://docs.oracle.com/javase/jp/8/docs/technotes/tools/windows/jcmd.html

ここでは、主にPIDを使っていきますが。

jcmd と既存ツールの対応 - sugarlife's blog

jcmdを試す - abcdefg.....

Java7からのjcmdのススメ(ThreadDump/HeapDump他)

環境

コマンドを確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.16.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

確認用のプログラム

今回のサンプルとして、こんなのを用意。
example/App.java

package example;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class App {
    public static void main(String... args) {
        String message = args[0];
        
        Thread t = new Thread(() -> {
                while (true) {
                    System.out.println(LocalDateTime.now() + " Hello " + message);

                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                }
            }, "my-thread");

        t.start();

        try {
            t.join();
        } catch (InterruptedException e) {
            // ignore
        }
    }
}

実行コマンドは、こちら。

$ java -Xmx512M -Dmy.property=foo -cp classes example.App World
2018-05-20T22:23:18.769 Hello World
2018-05-20T22:23:19.771 Hello World
2018-05-20T22:23:20.772 Hello World

では、順次見ていってみます。

ヘルプの標示方法

シンプルに、「-h」オプション。

$ jcmd -h
Usage: jcmd <pid | main class> <command ...|PerfCounter.print|-f file>
   or: jcmd -l                                                    
   or: jcmd -h                                                    
                                                                  
  command must be a valid jcmd command for the selected jvm.      
  Use the command "help" to see which commands are available.   
  If the pid is 0, commands will be sent to all Java processes.   
  The main class argument will be used to match (either partially 
  or fully) the class used to start Java.                         
  If no options are given, lists Java processes (same as -p).     
                                                                  
  PerfCounter.print display the counters exposed by this process  
  -f  read and execute commands from the file                     
  -l  list JVM processes on the local machine                     
  -h  this help

PID(もしくはmain class)を指定するタイプのものは、まずは使えるコマンドを「[PID] help」で標示可能。

$ jcmd [PID] help 
18586:
The following commands are available:
VM.native_memory
ManagementAgent.stop
ManagementAgent.start_local
ManagementAgent.start
GC.rotate_log
Thread.print
GC.class_stats
GC.class_histogram
GC.heap_dump
GC.run_finalization
GC.run
VM.uptime
VM.flags
VM.system_properties
VM.command_line
VM.version
help

For more information about a specific command use 'help <command>'.

さらに、PIDを指定しつつ「help」とコマンドを入力すると、コマンドごとに詳しい内容が標示されます。

$ jcmd [PID] help Thread.print
18586:
Thread.print
Print all threads with stacktraces.

Impact: Medium: Depends on the number of threads.

Permission: java.lang.management.ManagementPermission(monitor)

Syntax : Thread.print [options]

Options: (options must be specified using the <key> or <key>=<value> syntax)
	-l : [optional] print java.util.concurrent locks (BOOLEAN, false)

Javaプロセス一覧(jps相当)

「jps -lm」と同じであれば、以下になります。

$ jcmd

出力例。

18660 sun.tools.jcmd.JCmd
18586 example.App World

または

$ jcmd -l

出力例。

18680 sun.tools.jcmd.JCmd -l
18586 example.App World

「-l」の有無で、出力内容に差はありません。

jpsでの「-v」相当の情報は、これだと取れません。

起動引数を取得したい場合は、こちら。

$ jcmd [PID] VM.command_line

出力例。

18586:
VM Arguments:
jvm_args: -Xmx512M -Dmy.property=foo 
java_command: example.App World
java_class_path (initial): classes
Launcher Type: SUN_STANDARD

「jps -mlv」に比べると、ちょっと面倒…。

パフォーマンスカウンタ(jstat相当)

$ jcmd [PID] PerfCounter.print

出力例。

18586:
java.ci.totalTime=266593286
java.cls.loadedClasses=697
java.cls.sharedLoadedClasses=0
java.cls.sharedUnloadedClasses=0
java.cls.unloadedClasses=8
java.property.java.class.path="classes"
java.property.java.endorsed.dirs="/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/endorsed"
java.property.java.ext.dirs="/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext:/usr/java/packages/lib/ext"
java.property.java.home="/usr/lib/jvm/java-8-openjdk-amd64/jre"
java.property.java.library.path="/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib"
java.property.java.version="1.8.0_171"
java.property.java.vm.info="mixed mode"
java.property.java.vm.name="OpenJDK 64-Bit Server VM"
java.property.java.vm.specification.name="Java Virtual Machine Specification"
java.property.java.vm.specification.vendor="Oracle Corporation"
java.property.java.vm.specification.version="1.8"
java.property.java.vm.vendor="Oracle Corporation"
java.property.java.vm.version="25.171-b11"
java.rt.vmArgs="-Xmx512M -Dmy.property=foo"
java.rt.vmFlags=""
java.threads.daemon=4
java.threads.live=6
java.threads.livePeak=6
java.threads.started=6

〜省略〜

sun.gc.generation.0.capacity=50331648
sun.gc.generation.0.maxCapacity=178782208
sun.gc.generation.0.minCapacity=50331648
sun.gc.generation.0.name="new"
sun.gc.generation.0.space.0.capacity=37748736
sun.gc.generation.0.space.0.initCapacity=0
sun.gc.generation.0.space.0.maxCapacity=177733632
sun.gc.generation.0.space.0.name="eden"
sun.gc.generation.0.space.0.used=754992
sun.gc.generation.0.space.1.capacity=6291456
sun.gc.generation.0.space.1.initCapacity=0
sun.gc.generation.0.space.1.maxCapacity=59244544
sun.gc.generation.0.space.1.name="s0"
sun.gc.generation.0.space.1.used=0
sun.gc.generation.0.space.2.capacity=6291456
sun.gc.generation.0.space.2.initCapacity=0
sun.gc.generation.0.space.2.maxCapacity=59244544
sun.gc.generation.0.space.2.name="s1"
sun.gc.generation.0.space.2.used=0
sun.gc.generation.0.spaces=3
sun.gc.generation.1.capacity=38797312
sun.gc.generation.1.maxCapacity=358088704
sun.gc.generation.1.minCapacity=100663296
sun.gc.generation.1.name="old"
sun.gc.generation.1.space.0.capacity=38797312
sun.gc.generation.1.space.0.initCapacity=100663296
sun.gc.generation.1.space.0.maxCapacity=358088704
sun.gc.generation.1.space.0.name="old"
sun.gc.generation.1.space.0.used=869608
sun.gc.generation.1.spaces=1
sun.gc.lastCause="Heap Inspection Initiated GC"
sun.gc.metaspace.capacity=4980736
sun.gc.metaspace.maxCapacity=1082130432
sun.gc.metaspace.minCapacity=0
sun.gc.metaspace.used=4482840

〜省略〜

sun.rt.applicationTime=1793257092448
sun.rt.createVmBeginTime=1526823101812
sun.rt.createVmEndTime=1526823101858
sun.rt.internalVersion="OpenJDK 64-Bit Server VM (25.171-b11) for linux-amd64 JRE (1.8.0_171-8u171-b11-0ubuntu0.16.04.1-b11), built on Apr 27 2018 17:19:03 by "buildd" with gcc 5.4.0 20160609"
sun.rt.interruptedBeforeIO=0
sun.rt.interruptedDuringIO=0
sun.rt.javaCommand="example.App World"
sun.rt.jvmCapabilities="1100000000000000000000000000000000000000000000000000000000000000"
sun.rt.jvmVersion=430637067
sun.rt.safepointSyncTime=1671970
sun.rt.safepointTime=1856685473
sun.rt.safepoints=21
sun.rt.threadInterruptSignaled=0
sun.rt.vmInitDoneTime=1526823101846
sun.threads.vmOperationTime=1853640557
sun.urlClassLoader.readClassBytesTime=1078438
sun.zip.zipFile.openTime=1672337
sun.zip.zipFiles=10

膨大な量が1回表示されて終了するので、厳密に「相当」とは言いづらい感じが。

スレッドダンプ(jstack相当)

$ jcmd [PID] Thread.print

出力例。

18586:
2018-05-20 22:32:44
Full thread dump OpenJDK 64-Bit Server VM (25.171-b11 mixed mode):

"Attach Listener" #11 daemon prio=9 os_prio=0 tid=0x00007f99b0001000 nid=0x4963 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"my-thread" #10 prio=5 os_prio=0 tid=0x00007f99f019f800 nid=0x48c0 waiting on condition [0x00007f99c51bc000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:340)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
	at example.App.lambda$main$0(App.java:15)
	at example.App$$Lambda$1/455659002.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007f99f00ca000 nid=0x48ac runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007f99f00c7000 nid=0x48ab waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

〜省略〜

"VM Thread" os_prio=0 tid=0x00007f99f0080000 nid=0x48a4 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f99f001f800 nid=0x489c runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f99f0021800 nid=0x489d runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f99f0023000 nid=0x489e runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f99f0025000 nid=0x489f runnable 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f99f0026800 nid=0x48a0 runnable 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f99f0028800 nid=0x48a1 runnable 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f99f002a000 nid=0x48a2 runnable 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f99f002c000 nid=0x48a3 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f99f00cc800 nid=0x48ad waiting on condition 

JNI global references: 320


「jstack -F」(強制出力)とか「jstack -m」(ネイティブC/C++フレームの情報)相当を実現する方法はないのかな?

ヒープダンプ(jmap -dump:live,format=b相当)

$ jcmd [PID] GC.heap_dump filename=[Heap Dump Filename]

出力例。

18586:
Heap dump file created

「-all」オプションをつけることで、非参照オブジェクトもダンプ可能。

Options: (options must be specified using the <key> or <key>=<value> syntax)
	-all : [optional] Dump all objects, including unreachable objects (BOOLEAN, false)

ヒープヒストグラム(jmap -histo:live相当)

$ jcmd [PID] GC.class_histogram

出力例。

18586:

 num     #instances         #bytes  class name
----------------------------------------------
   1:           795         223600  [B
   2:          2977         196800  [C
   3:           806          92176  java.lang.Class
   4:          2965          71160  java.lang.String
   5:          1493          47776  java.util.concurrent.ConcurrentHashMap$Node
   6:           870          41032  [Ljava.lang.Object;
   7:           273          13664  [I
   8:           194          10864  java.lang.invoke.MemberName
   9:            14          10528  [Ljava.util.concurrent.ConcurrentHashMap$Node;
  10:           288           9216  java.util.HashMap$Node

〜省略〜

 285:             1             16  sun.misc.Launcher
 286:             1             16  sun.misc.Launcher$Factory
 287:             1             16  sun.misc.Perf
 288:             1             16  sun.misc.Unsafe
 289:             1             16  sun.net.www.protocol.file.Handler
 290:             1             16  sun.reflect.ReflectionFactory
 291:             1             16  sun.util.calendar.Gregorian
Total         14808         869608

こちらも、「-all」を付与することで、非参照オブジェクトも表示可能です。

フラグの有効有無の確認(jinfo -flag相当)

$ jcmd [PID] VM.flags

出力例。

18586:
-XX:CICompilerCount=4 -XX:InitialHeapSize=150994944 -XX:MaxHeapSize=536870912 -XX:MaxNewSize=178782208 -XX:MinHeapDeltaBytes=524288 -XX:NewSize=50331648 -XX:OldSize=100663296 -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseParallelGC

「-all」オプションを付与することで、全フラグの状態が出力されます。

$ jcmd [PID] VM.flags -all

Options: (options must be specified using the <key> or <key>=<value> syntax)
	-all : [optional] Print all flags supported by the VM (BOOLEAN, false)

出力例。

18586:
[Global flags]
    uintx AdaptiveSizeDecrementScaleFactor          = 4                                   {product}
    uintx AdaptiveSizeMajorGCDecayTimeScale         = 10                                  {product}
    uintx AdaptiveSizePausePolicy                   = 0                                   {product}
    uintx AdaptiveSizePolicyCollectionCostMargin    = 50                                  {product}
    uintx AdaptiveSizePolicyInitializingSteps       = 20                                  {product}
    uintx AdaptiveSizePolicyOutputInterval          = 0                                   {product}
    uintx AdaptiveSizePolicyWeight                  = 10                                  {product}
    uintx AdaptiveSizeThroughPutPolicy              = 0                                   {product}
    uintx AdaptiveTimeWeight                        = 25                                  {product}
     bool AdjustConcurrency                         = false                               {product}
     bool AggressiveOpts                            = false                               {product}
     intx AliasLevel                                = 3                                   {C2 product}
     bool AlignVector                               = true                                {C2 product}

〜省略〜

     bool UseXmmLoadAndClearUpper                   = true                                {ARCH product}
     bool UseXmmRegToRegMoveAll                     = true                                {ARCH product}
     bool VMThreadHintNoPreempt                     = false                               {product}
     intx VMThreadPriority                          = -1                                  {product}
     intx VMThreadStackSize                         = 1024                                {pd product}
     intx ValueMapInitialSize                       = 11                                  {C1 product}
     intx ValueMapMaxLoopSize                       = 8                                   {C1 product}
     intx ValueSearchLimit                          = 1000                                {C2 product}
     bool VerifyMergedCPBytecodes                   = true                                {product}
     bool VerifySharedSpaces                        = false                               {product}
     intx WorkAroundNPTLTimedWaitHang               = 1                                   {product}
    uintx YoungGenerationSizeIncrement              = 20                                  {product}
    uintx YoungGenerationSizeSupplement             = 80                                  {product}
    uintx YoungGenerationSizeSupplementDecay        = 8                                   {product}
    uintx YoungPLABSize                             = 4096                                {product}
     bool ZeroTLAB                                  = false                               {product}
     intx hashCode                                  = 5                                   {product}

まるで、「-XX:+PrintFlagsFinal」みたいですね。

システムプロパティ一覧(jinfo -sysprops相当)

$ jcmd [PID] VM.system_properties

出力例。

18586:
#Sun May 20 23:15:08 JST 2018
java.runtime.name=OpenJDK Runtime Environment
sun.boot.library.path=/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64
java.vm.version=25.171-b11
java.vm.vendor=Oracle Corporation
java.vendor.url=http\://java.oracle.com/
my.property=foo
path.separator=\:
java.vm.name=OpenJDK 64-Bit Server VM
file.encoding.pkg=sun.io
user.country=JP
sun.java.launcher=SUN_STANDARD
sun.os.patch.level=unknown
java.vm.specification.name=Java Virtual Machine Specification

〜省略〜

java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
sun.arch.data.model=64
user.language=ja
java.specification.vendor=Oracle Corporation
awt.toolkit=sun.awt.X11.XToolkit
java.vm.info=mixed mode
java.version=1.8.0_171
java.ext.dirs=/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext\:/usr/java/packages/lib/ext
sun.boot.class.path=/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/resources.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/rt.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/sunrsasign.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jsse.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jce.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/charsets.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/jfr.jar\:/usr/lib/jvm/java-8-openjdk-amd64/jre/classes
java.vendor=Oracle Corporation
file.separator=/
java.vendor.url.bug=http\://bugreport.sun.com/bugreport/
sun.io.unicode.encoding=UnicodeLittle
sun.cpu.endian=little
sun.desktop=gnome
sun.cpu.isalist=

その他

全部を書いているわけではないですが、またまとめたいものがあったら、順次追記していきます。

2018-05-13

JavaからLDAP/LDAPS接続(自己署名証明書の検証スルーコード付き)

最近、ちょっとLDAP…LDAPSでの接続とかをやってみたので、メモとして。

LDAPおよびLDAPSでの接続を、Javaから簡単な例で書いてみます。

こちらを参考に。

JNDI/LDAPサービス・プロバイダ

環境

JavaMaven

$ java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.16.04.1-b11)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

$ mvn -v
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_171, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

LDAPサーバーについては、OpenLDAPのDockerイメージを使いました。

https://hub.docker.com/r/osixia/openldap/:title:osixia/openldap / Ducker Hub]

osixia/openldap / GitHub

Dockerイメージの起動コマンドは、こちらで。

$ docker run -it --rm --name openldap --env LDAP_ADMIN_PASSWORD="admin-password" --env LDAP_DOMAIN=test.example.com --env LDAP_TLS_VERIFY_CLIENT=try osixia/openldap:1.2.0

ドメインは「test.example.com」にして、adminのパスワードは「admin-password」へ、あとTLSの設定を。

TLSの設定は、Javaからの接続の時にちょっとハマったので。

TLS: can’t accept: No certificate was found.. #105

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.10.0</version>
            <scope>test</scope>
        </dependency>

LDAPサーバーには、エントリをひとつ追加。

test.ldif

dn: uid=user001,dc=test,dc=example,dc=com
objectClass: inetOrgPerson
objectClass: posixAccount
uid: user001
cn: テスト 太郎
sn: テスト
uidNumber: 10001
gidNumber: 10001
homeDirectory: /home/user001
userPassword: {SSHA}K1h08ZgBJQIInrqH1eerLG/I4jO2H9fh
description: My Test Account

追加。

$ ldapadd -f test.ldif -D "cn=admin,dc=test,dc=example,dc=com" -w admin-password
adding new entry "uid=user001,dc=test,dc=example,dc=com"


確認(Dockerコンテナ内で実行)。

## LDAP
$ ldapsearch -x -H ldap://localhost -b uid=user001,dc=test,dc=example,dc=com -D "cn=admin,dc=test,dc=example,dc=com" -w admin-password
# extended LDIF
#
# LDAPv3
# base <uid=user001,dc=test,dc=example,dc=com> with scope subtree
# filter: (objectclass=*)
# requesting: ALL
#

# user001, test.example.com
dn: uid=user001,dc=test,dc=example,dc=com
objectClass: inetOrgPerson
objectClass: posixAccount
uid: user001
cn:: 44OG44K544OIIOWkqumDjg==
sn:: 44OG44K544OI
uidNumber: 10001
gidNumber: 10001
homeDirectory: /home/user001
userPassword:: e1NIQX1hUnloRlJkSXJudFNzY2d1cXF6R0Q0MUIyNEk9
description: My Test Account

# search result
search: 2
result: 0 Success

# numResponses: 2
# numEntries: 1


## LDAPS
$ ldapsearch -x -H ldap://localhost -b uid=user001,dc=test,dc=example,dc=com -D "cn=admin,dc=test,dc=example,dc=com" -w admin-password
# extended LDIF
#
# LDAPv3
# base <uid=user001,dc=test,dc=example,dc=com> with scope subtree
# filter: (objectclass=*)
# requesting: ALL
#

# user001, test.example.com
dn: uid=user001,dc=test,dc=example,dc=com
objectClass: inetOrgPerson
objectClass: posixAccount
uid: user001
cn:: 44OG44K544OIIOWkqumDjg==
sn:: 44OG44K544OI
uidNumber: 10001
gidNumber: 10001
homeDirectory: /home/user001
userPassword:: e1NIQX1hUnloRlJkSXJudFNzY2d1cXF6R0Q0MUIyNEk9
description: My Test Account

# search result
search: 2
result: 0 Success

# numResponses: 2
# numEntries: 1

テストコードの雛形

実行は、テストコードで行っていきます。雛形は、こんな感じで。
src/test/java/org/liittlewings/ldap/example/SimpleLdapTest.java

package org.liittlewings.ldap.example;

import java.util.Hashtable;
import javax.naming.CommunicationException;
import javax.naming.Context;
import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
import javax.net.ssl.SSLHandshakeException;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class SimpleLdapTest {
    // ここに、テストを書く!!
}

LDAP接続

LDAPへの接続コードは、こちらも合わせて参考に。

JavaでLDAP認証をやってみる - 眩しいサインを見ただろう

JavaでActiveDirectory検索を行う(AD認証) | 株式会社アースリンク

LDAP接続用コード | 寺田 佳央 - Yoshio Terada

adminと、追加ユーザーそれぞれで接続できれば、OKとしましょう。

で、作ったのはこんなコード。

    @Test
    public void ldapGettingStarted() throws NamingException {
        String url = "ldap://172.17.0.2:389";

        Hashtable<String, String> env = new Hashtable<>();
        env.put(Context.PROVIDER_URL, url);
        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
        env.put(Context.SECURITY_AUTHENTICATION, "simple");
        env.put(Context.SECURITY_PRINCIPAL, "cn=admin,dc=test,dc=example,dc=com");
        env.put(Context.SECURITY_CREDENTIALS, "admin-password");

        DirContext ctx = new InitialDirContext(env);

        try {
            SearchControls searchControls = new SearchControls();
            searchControls.setSearchScope(SearchControls.SUBTREE_SCOPE);
            NamingEnumeration<SearchResult> searchResult =
                    ctx.search("dc=test,dc=example,dc=com", "uid=user001", searchControls);

            assertThat(searchResult.hasMoreElements()).isTrue();
            assertThat(searchResult.nextElement().getAttributes().get("uid").get()).isEqualTo("user001");

            Hashtable<String, String> uenv = new Hashtable<>(env);
            uenv.put(Context.SECURITY_PRINCIPAL, "uid=user001,dc=test,dc=example,dc=com");
            uenv.put(Context.SECURITY_CREDENTIALS, "user-password");

            DirContext uctx = new InitialDirContext(uenv);

            // ok

            uctx.close();
        } finally {
            ctx.close();
        }
    }

はい。

LDAPS(自己署名証明書

続いて、LDAPS。ですが、証明書が正規のものではないため、SSLハンドシェイクで失敗します。

    @Test
    public void ldapsGettingStarted() throws NamingException {
        String url = "ldaps://172.17.0.2:636";

        Hashtable<String, String> env = new Hashtable<>();
        env.put(Context.PROVIDER_URL, url);
        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
        env.put(Context.SECURITY_AUTHENTICATION, "simple");
        env.put(Context.SECURITY_PRINCIPAL, "cn=admin,dc=test,dc=example,dc=com");
        env.put(Context.SECURITY_CREDENTIALS, "admin-password");

        assertThatThrownBy(() -> new InitialDirContext(env))
                .isInstanceOf(CommunicationException.class)
                .hasMessage("simple bind failed: 172.17.0.2:636")
                .hasCauseInstanceOf(SSLHandshakeException.class);
        // javax.net.ssl.SSLHandshakeException:
        // sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    }

さてどうしましょう、ということで、証明書をkeystoreに追加してもいいのですが、ここはコードで解決してみましょう。

java.naming.ldap.factory.socket」で、SocketFactoryを指定できるようなので、カスタムなSSLSocketFactoryを作成してこちらを使用するようにしてみます。

src/main/java/org/littlewings/ldap/example/LooseSSLSocketFactory.java

package org.littlewings.ldap.example;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;

public class LooseSSLSocketFactory extends SSLSocketFactory {
    SSLSocketFactory delegate;

    public LooseSSLSocketFactory() {
        try {
            SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null,
                    new X509TrustManager[]{
                            new X509TrustManager() {
                                @Override
                                public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                                }

                                @Override
                                public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                                }

                                @Override
                                public X509Certificate[] getAcceptedIssuers() {
                                    return null;
                                }
                            }
                    },
                    new SecureRandom());

            delegate = sslContext.getSocketFactory();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String[] getDefaultCipherSuites() {
        return delegate.getDefaultCipherSuites();
    }

    @Override
    public String[] getSupportedCipherSuites() {
        return delegate.getSupportedCipherSuites();
    }

    @Override
    public Socket createSocket(Socket socket, String s, int i, boolean b) throws IOException {
        return delegate.createSocket(socket, s, i, b);
    }

    @Override
    public Socket createSocket(String s, int i) throws IOException, UnknownHostException {
        return delegate.createSocket(s, i);
    }

    @Override
    public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException, UnknownHostException {
        return delegate.createSocket(s, i, inetAddress, i1);
    }

    @Override
    public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
        return delegate.createSocket(inetAddress, i);
    }

    @Override
    public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
        return delegate.createSocket(inetAddress, i, inetAddress1, i1);
    }

    public static SocketFactory getDefault() {
        return new LooseSSLSocketFactory();
    }
}

このSSLSocketFactoryを使うように、修正したコード。

    @Test
    public void ldapGettingStartedWithCustomSSLSocketFactory() throws NamingException {
        String url = "ldaps://172.17.0.2:636";

        Hashtable<String, String> env = new Hashtable<>();
        env.put(Context.PROVIDER_URL, url);
        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
        env.put(Context.SECURITY_AUTHENTICATION, "simple");
        env.put(Context.SECURITY_PRINCIPAL, "cn=admin,dc=test,dc=example,dc=com");
        env.put(Context.SECURITY_CREDENTIALS, "admin-password");
        env.put("java.naming.ldap.factory.socket", "org.littlewings.ldap.example.LooseSSLSocketFactory");

        DirContext ctx = new InitialDirContext(env);

        try {
            SearchControls searchControls = new SearchControls();
            searchControls.setSearchScope(SearchControls.SUBTREE_SCOPE);
            NamingEnumeration<SearchResult> searchResult =
                    ctx.search("dc=test,dc=example,dc=com", "uid=user001", searchControls);

            assertThat(searchResult.hasMoreElements()).isTrue();
            SearchResult searchResult1 = searchResult.nextElement();
            assertThat(searchResult1.getAttributes().get("uid").get()).isEqualTo("user001");

            Hashtable<String, String> uenv = new Hashtable<>(env);
            uenv.put(Context.SECURITY_PRINCIPAL, "uid=user001,dc=test,dc=example,dc=com");
            uenv.put(Context.SECURITY_CREDENTIALS, "user-password");

            DirContext uctx = new InitialDirContext(uenv);

            // ok

            uctx.close();
        } finally {
            ctx.close();
        }
    }

追加したのは、この部分ですね。

        env.put("java.naming.ldap.factory.socket", "org.littlewings.ldap.example.LooseSSLSocketFactory");

LDAPに慣れていないのでだいぶてこずりましたが、とりあえずはこんなところで。

2018-05-11

Ubuntu Linux 16.04 LTSにMinikubeをインストールして、DriverにDockerを使用する

少しずつ、Kubernetesまわりの勉強を始めてみようかなということで。

Ubuntu Linux 16.04 LTSに、Minikubeをインストールして動かしてみます。

minikube

Minikubeというのは、Kubernetesをローカルで動かすためのソフトウェアですね。自分の手元のような環境だと、こういうのがありがたいです。

Minikube自体も前からあったのですが、Minikubeを使うためのソフトウェアがVirtualBoxなどで少し敬遠していたものの、
実はDockerでも動かすことができると聞いて、やってみようかなと。

では、進めていってみましょう。

環境

タイトルにも書いていますが、環境はUbuntu Linux 16.04 LTSで行っています。

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=16.04
DISTRIB_CODENAME=xenial
DISTRIB_DESCRIPTION="Ubuntu 16.04.4 LTS"

インストール

GitHubのREADME.mdを見ながら、インストール。

現時点での最新版はv0.26.1なのですが、うちの環境だと異様にCPUを使う、また2回目以降の挙動がどうにも変で、バージョンを下げて0.25.2を
使ってみることにします。幸い、このバージョンなら動くので。

Unable to start minikube v0.26 #2703

BUG - Minikube 0.26.0 - Ubuntu 17.10 - Unable to restart without error #2707

Minikubeのインストール。

$ curl -Lo minikube https://github.com/kubernetes/minikube/releases/download/v0.25.2/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/

確認。

$ minikube version
minikube version: v0.25.2

kubectlのインストール。

Install and Set Up kubectl

$ sudo apt-get update && sudo apt-get install -y apt-transport-https
$ curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
$ sudo sh -c 'cat <<EOF >/etc/apt/sources.list.d/kubernetes.list
deb http://apt.kubernetes.io/ kubernetes-xenial main
EOF'
$ sudo apt-get update
$ sudo apt-get install -y kubectl

確認。

$ kubectl version
Client Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.2", GitCommit:"81753b10df112992bf51bbc2c2f85208aad78335", GitTreeState:"clean", BuildDate:"2018-04-27T09:22:21Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Unable to connect to the server: net/http: TLS handshake timeout

起動

では、Minikubeを起動してみます。

こちらに沿って。

Linux Continuous Integration without VM Support

「minikube start」で、オプションに「--vm-driver=none」を使うことで、ドライバをDockerとして起動できます。

$ sudo minikube start --vm-driver=none
There is a newer version of minikube available (v0.26.1).  Download it here:
https://github.com/kubernetes/minikube/releases/tag/v0.26.1

To disable this notification, run the following:
minikube config set WantUpdateNotification false
Starting local Kubernetes v1.9.4 cluster...
Starting VM...
Getting VM IP address...
Moving files into cluster...
Downloading localkube binary
 163.02 MB / 163.02 MB [============================================] 100.00% 0s
 0 B / 65 B [----------------------------------------------------------]   0.00%
 65 B / 65 B [======================================================] 100.00% 0sSetting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...
Kubectl is now configured to use the cluster.
===================
WARNING: IT IS RECOMMENDED NOT TO RUN THE NONE DRIVER ON PERSONAL WORKSTATIONS
	The 'none' driver will run an insecure kubernetes apiserver as root that may leave the host vulnerable to CSRF attacks

When using the none driver, the kubectl config and credentials generated will be root owned and will appear in the root home directory.
You will need to move the files to the appropriate location and then set the correct permissions.  An example of this is below:

	sudo mv /root/.kube $HOME/.kube # this will write over any previous configuration
	sudo chown -R $USER $HOME/.kube
	sudo chgrp -R $USER $HOME/.kube
	
	sudo mv /root/.minikube $HOME/.minikube # this will write over any previous configuration
	sudo chown -R $USER $HOME/.minikube
	sudo chgrp -R $USER $HOME/.minikube 

This can also be done automatically by setting the env var CHANGE_MINIKUBE_NONE_USER=true
Loading cached images from config file.

「minikube start」後に、次に実行すべきコマンドが表示されるので、ここはこれにしたがっておきます。

$ sudo chown -R $USER.$USER $HOME/.kube
$ sudo chown -R $USER.$USER $HOME/.minikube

DeploymentとServiceの作成。

$ kubectl run hello-minikube --image=k8s.gcr.io/echoserver:1.4 --port=8080
deployment.apps "hello-minikube" created
$ kubectl expose deployment hello-minikube --type=NodePort
service "hello-minikube" exposed

Podの状態を確認してみます。

$ kubectl get pod
NAME                            READY     STATUS    RESTARTS   AGE
hello-minikube-c6c6764d-nvr9f   1/1       Running   0          13s

curlで動作確認。

$ curl $(minikube service hello-minikube --url)
CLIENT VALUES:
client_address=172.17.0.1
command=GET
real path=/
query=nil
request_version=1.1
request_uri=http://192.168.254.128:8080/

SERVER VALUES:
server_version=nginx: 1.10.0 - lua: 10001

HEADERS RECEIVED:
accept=*/*
host=192.168.254.128:31572
user-agent=curl/7.47.0
BODY:
-no body in request-

また、この時に動作しているDockerコンテナは、こんな感じです。

$ docker ps
CONTAINER ID        IMAGE                                         COMMAND                  CREATED              STATUS              PORTS               NAMES
3fe390c8654b        k8s.gcr.io/echoserver                         "nginx -g 'daemon of"   31 seconds ago       Up 30 seconds                           k8s_hello-minikube_hello-minikube-c6c6764d-8824q_default_b2a3da22-5538-11e8-908f-000c29471a6e_0
ac01183254be        gcr.io/google_containers/pause-amd64:3.0      "/pause"                 31 seconds ago       Up 31 seconds                           k8s_POD_hello-minikube-c6c6764d-8824q_default_b2a3da22-5538-11e8-908f-000c29471a6e_0
2cee91fc6848        k8s.gcr.io/k8s-dns-sidecar-amd64              "/sidecar --v=2 --lo"   About a minute ago   Up About a minute                       k8s_sidecar_kube-dns-54cccfbdf8-m572j_kube-system_9c815b99-5538-11e8-908f-000c29471a6e_0
403a57ae5136        k8s.gcr.io/k8s-dns-dnsmasq-nanny-amd64        "/dnsmasq-nanny -v=2"   About a minute ago   Up About a minute                       k8s_dnsmasq_kube-dns-54cccfbdf8-m572j_kube-system_9c815b99-5538-11e8-908f-000c29471a6e_0
a50ca1f382b4        k8s.gcr.io/k8s-dns-kube-dns-amd64             "/kube-dns --domain="   About a minute ago   Up About a minute                       k8s_kubedns_kube-dns-54cccfbdf8-m572j_kube-system_9c815b99-5538-11e8-908f-000c29471a6e_0
dfeb724bc721        e94d2f21bc0c                                  "/dashboard --insecu"   About a minute ago   Up About a minute                       k8s_kubernetes-dashboard_kubernetes-dashboard-77d8b98585-wxljr_kube-system_9c5d4ee4-5538-11e8-908f-000c29471a6e_0
6b753de7c34c        gcr.io/google_containers/pause-amd64:3.0      "/pause"                 About a minute ago   Up About a minute                       k8s_POD_kube-dns-54cccfbdf8-m572j_kube-system_9c815b99-5538-11e8-908f-000c29471a6e_0
299bcb3aab05        gcr.io/google_containers/pause-amd64:3.0      "/pause"                 About a minute ago   Up About a minute                       k8s_POD_kubernetes-dashboard-77d8b98585-wxljr_kube-system_9c5d4ee4-5538-11e8-908f-000c29471a6e_0
4dc66c85f911        gcr.io/k8s-minikube/storage-provisioner       "/storage-provisioner"   About a minute ago   Up About a minute                       k8s_storage-provisioner_storage-provisioner_kube-system_9bdd26ec-5538-11e8-908f-000c29471a6e_0
be231b0a8af7        gcr.io/google_containers/pause-amd64:3.0      "/pause"                 About a minute ago   Up About a minute                       k8s_POD_storage-provisioner_kube-system_9bdd26ec-5538-11e8-908f-000c29471a6e_0
1a3b22ed8c70        gcr.io/google-containers/kube-addon-manager   "/opt/kube-addons.sh"    About a minute ago   Up About a minute                       k8s_kube-addon-manager_kube-addon-manager-ikaruga-ubuntu_kube-system_c4c3188325a93a2d7fb1714e1abf1259_1
c5d0c624aefc        gcr.io/google_containers/pause-amd64:3.0      "/pause"                 About a minute ago   Up About a minute                       k8s_POD_kube-addon-manager-ikaruga-ubuntu_kube-system_c4c3188325a93a2d7fb1714e1abf1259_0

Dockerコンテナだけかと思いきや、ホスト側にもいろいろいたり。

$ ps -ef | grep kube | grep -v 'grep'
root      46637      1 25 01:37 ?        00:00:24 /usr/local/bin/localkube --dns-domain=cluster.local --node-ip=192.168.254.128 --generate-certs=false --logtostderr=true --enable-dns=false
root      46788  46772  0 01:37 ?        00:00:00 /bin/bash /opt/kube-addons.sh
root      47396  47360  0 01:38 ?        00:00:00 /kube-dns --domain=cluster.local. --dns-port=10053 --config-map=kube-dns --v=2
nobody    47602  47577  0 01:38 ?        00:00:00 /sidecar --v=2 --logtostderr --probe=kubedns,127.0.0.1:10053,kubernetes.default.svc.cluster.local.,5,A --probe=dnsmasq,127.0.0.1:53,kubernetes.default.svc.cluster.local.,5,A

削除

使い終わったら、ServiceとDeploymentを削除。

$ kubectl delete service hello-minikube
service "hello-minikube" deleted
$ kubectl delete deployment hello-minikube
deployment.extensions "hello-minikube" deleted

Minikubeも停止…

$ sudo minikube stop
Stopping local Kubernetes cluster...
Machine stopped.

なのですが、なぜか止まりません…。

最終的には、「minikube stop」を飛ばして削除です…。

$ sudo minikube delete

とりあえず、最低限は動かせた気はしますが、いろいろ怪しいです…。

これから、ちょっとずつ慣れて…いけるでしょうか?

0.26.1は、なにがダメだった?

うちだと、ログにこんな感じに出力されるうえ、CPUリソースを大量に消費します。

[preflight] Running pre-flight checks.
	[WARNING SystemVerification]: docker version is greater than the most recently validated version. Docker version: 17.12.1-ce. Max validated version: 17.03
	[WARNING Swap]: running with swap on is not supported. Please disable swap
	[WARNING FileExisting-ebtables]: ebtables not found in system path
	[WARNING FileExisting-ethtool]: ethtool not found in system path
	[WARNING FileExisting-socat]: socat not found in system path
	[WARNING FileExisting-crictl]: crictl not found in system path
Suggestion: go get github.com/kubernetes-incubator/cri-tools/cmd/crictl
Flag --admission-control has been deprecated, Use --enable-admission-plugins or --disable-admission-plugins instead. Will be removed in a future version.
[preflight] Some fatal errors occurred:
	[ERROR Port-10250]: Port 10250 is in use
[preflight] If you know what you are doing, you can make a check non-fatal with `--ignore-preflight-errors=...`
: running command: sudo /usr/bin/kubeadm init --config /var/lib/kubeadm.yaml --ignore-preflight-errors=DirAvailable--etc-kubernetes-manifests --ignore-preflight-errors=DirAvailable--data --ignore-preflight-errors=FileAvailable--etc-kubernetes-manifests-kube-scheduler.yaml --ignore-preflight-errors=FileAvailable--etc-kubernetes-manifests-kube-apiserver.yaml --ignore-preflight-errors=FileAvailable--etc-kubernetes-manifests-kube-controller-manager.yaml --ignore-preflight-errors=FileAvailable--etc-kubernetes-manifests-etcd.yaml --ignore-preflight-errors=Swap --ignore-preflight-errors=CRI 
.: exit status 2`

これがどうしても抜けられずに、0.25に下げたらいいよ、みたいなIssueを見かけたので、バージョンを下げたら確かに
起動しました。代わりに、「minikube stop」はなんか止められないのですが…。

VMwareの肥大化した仮想マシンを小さくする

最近、VMware仮想マシンを使っていて、「なんかディスクサイズ、大きくない?」ということに気付きまして。

仮想マシン上で使っているディスクサイズからすると、1.5倍くらいのディスクサイズを占めていて、これはちょっと
どうにかならないかなぁと調べたところ、次のコマンドで小さくできました。

$ sudo vmware-toolbox-cmd disk shrink /

ヘルプを見ると、こんな感じ。

$ vmware-toolbox-cmd help disk
disk: ディスク圧縮操作を実行
使用方法: vmware-toolbox-cmd disk <サブコマンド> [引数]

サブコマンド:
   list: 使用可能な場所を一覧表示
   shrink <場所>: 指定された場所のファイル システムをワイプおよび圧縮
   shrinkonly: すべてのディスクを圧縮
   wipe <場所>: 指定された場所のファイル システムをワイプ

「shrink」サブコマンドで、指定した場所のファイルシステムを圧縮するようで。

このコマンドを実行すると、圧縮中は仮想マシンが使えなくなるのですが、仮想マシン上のサイズ…とはさすがにいきませんが、
かなり小さくしてくれました。

自分の環境では、50Gくらいディスクを使っていた仮想マシンが、35Gくらいにはなってくれましたよ。

2018-05-03

Apache Kafkaで、実行中にBrokerを落とした時のProducer/Consumerの挙動を確認する

こちらの続きで、Apache Kafkaで、ProducerやConsumerを実行中にBrokerを落とした時にどうなるか、その挙動を見てみようと
思います。

Apache Kafkaで、実行中にBrokerを追加した時のProducer/Consumerの挙動を確認する - CLOVER

環境とお題

基本的には、前述のエントリと同じです。

利用する、Apache Kafkaのバージョンは1.1.0として、Apache ZooKeeperをひとつ、Apache KafkaのBrokerを以下のような
体系で用意します。

Apache ZooKepper … 172.20.0.2
Apache Kafka … 172.20.0.3〜

前回のエントリとは、IPアドレス体系が少し変わりました…。

Apache KafkaのBrokerは最初は3つで、途中で2つにします。

この時に、Consumerはそのまま動き続けるか?とかを見れたらいいなと。

環境も、前回と同じ。

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

サンプルプログラム

サンプルプログラムについても、前回のエントリと同じものを使います。

Maven依存関係はこちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

Producer。
src/main/java/org/littlewings/kafka/SimpleLoader.java

package org.littlewings.kafka;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleLoader {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;

        if (args.length > 1) {
            bootstrapServers = args[0];
            topicName = args[1];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName);

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }
    }
}

30秒おきに、適当にメッセージを30個放り込むProducerです。起動引数では、「bootstrap.servers」とTopic名を
もらうようにしています。

Consumer側。
src/main/java/org/littlewings/kafka/SimpleConsumer.java

package org.littlewings.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;
        String groupId;

        if (args.length > 2) {
            bootstrapServers = args[0];
            topicName = args[1];
            groupId = args[2];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
            groupId = "my-group";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName+ ", " + groupId);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "60000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        try (KafkaConsumer<Integer, String> consumer =
             new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList(topicName));

            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(1000L);

                Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator();

                while (iterator.hasNext()) {
                    ConsumerRecord<Integer, String> record = iterator.next();
                    System.out.println(record.partition() + " / "  + record.value());
                }

                TimeUnit.SECONDS.sleep(1L);
            }
        }
    }
}

こちらも、起動引数に「bootstrap.servers」とTopic名、Consumer Group名を受け取るようにしています。起動後は、ひたすら実行を
継続します。

出力するのは、メッセージを読み取ったパーティションのIDと、値です。

前回とまったく同じ。

Topicの作成とプログラム実行。

それでは、まずはTopicの作成。

$ bin/kafka-topics.sh --create --zookeeper 172.20.0.2:2181 --replication-factor 2 --partitions 5 --topic my-topic
Created topic "my-topic".

確認。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5,4
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

プログラムの実行。Consumer Groupは、「my-group」とします。

## Producer
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.20.0.3:9092 my-topic'

## Consumer-1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

## Consumer-2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

## Consumer-3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

各Consumerの割り当て状況。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        2          7               12              5               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        3          6               12              6               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        0          6               12              6               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        1          6               12              6               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        4          6               12              6               consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1     consumer-1

Producer側は30秒おきにメッセージをTopicに放り込みつつ

2018-05-03T23:55:47.847 sleeping...
2018-05-03T23:56:18.048 sleeping...

Consumer側では、メッセージが取り出されて標準出力に出力され続けます。

## Consumer-1
0 / message-200
0 / message-205
0 / message-210
1 / message-183
1 / message-188
1 / message-193


## Consumer-2
4 / message-157
4 / message-162
4 / message-167
4 / message-172
4 / message-177


## Consumer-3
2 / message-181
2 / message-186
3 / message-184
2 / message-191
2 / message-196
2 / message-201
2 / message-206
3 / message-189

まあ、動いています、と。

Brokerをダウンさせる

では、ここでBrokerをひとつ落としてみましょう。

$ kill [PID]

Topicの状態を見ると、idが4のBrokerがいなくなりました。

bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic'
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 4,5	Isr: 5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

ダウン前は、こちら。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5,4
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

Consumer Groupにおける、Consumerの割り当ては、特に変わらなかったようです。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        2          78              78              0               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        3          78              78              0               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        0          78              78              0               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        1          78              78              0               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        4          78              78              0               consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1     consumer-1

なお、BrokerおよびConsumerはふつうに動き続けていました。

## Consumer-1
0 / message-500
0 / message-505
0 / message-510
1 / message-483
1 / message-488
1 / message-493
1 / message-498


## Consumer-2
4 / message-427
4 / message-432
4 / message-437
4 / message-442
4 / message-447
4 / message-452


## Consumer-3
2 / message-491
2 / message-496
2 / message-501
2 / message-506
3 / message-484
3 / message-489
3 / message-494

リバランスする

ところで、Topicの状態を見るとBroker間でパーティションの割り当てが偏ったままです。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 4,5	Isr: 5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

Leaderから4がいなくなったものの、レプリカにはまだ4のBrokerがいる状態になっています。

では、これをリバランスしましょう。

次の内容のJSONを作成。
topics-to-move.json

{
  "topics": [{ "topic": "my-topic" }],
  "version": 1
}

作成したJSONを元に、次のコマンドを実行。「--broker-list」に指定するBrokerは、残った3と5にしています。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list=3,5 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

で、「Proposed partition reassignment configuration」で表示されたJSONの内容をファイルにして
expand-cluster-reassignment.json

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

実行。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

再割り当て完了です。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 3,5	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 3,5	Isr: 5,3
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

まとめ

ProducerやConsumerの実行中に、Brokerを落としてみましたが、今回はそのまま動作しました。ただ、時々Consumer側がなにも拾わなくなることがあり、ちょっと
怖い感じがします。

あと、Brokerが落ちてもリバランスが自動で行われるわけではないので、そこをリバランスしないままさらにもうひとつBrokerが落ちたりすると、データの分布に
よってはロストしたりするんでしょうね。

やっぱり、Nodeが落ちた時の挙動は気になるものですね…。

2018-04-15

Apache Kafkaで、実行中にBrokerを追加した時のProducer/Consumerの挙動を確認する

Apache Kafkaで、ProducerやConsumerを実行中にBrokerを追加した時にどうなるか、その挙動を見てみようと思います。

環境とお題

利用する、Apache Kafkaのバージョンは1.1.0とします。

Apache ZooKeeperをひとつ、Apache KafkaのBrokerを以下のような体系で用意します。

  • Apache ZooKepper … 172.21.0.2
  • Apache Kafka … 172.21.0.3〜

Apache KafkaのBrokerは最初は2つで、あとで3つに増やします。この時のバリエーションとして、

  • Brokerを追加してパーティションを追加
  • Brokerを追加して、パーティション配置を再割り当て

という感じでやってみようかと。

その他、環境的なところはこんな感じです。

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

サンプルプログラム

今回は、簡単なProducer、Consumerのプログラムを用意します。

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

Producer。
src/main/java/org/littlewings/kafka/SimpleLoader.java

package org.littlewings.kafka;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleLoader {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;

        if (args.length > 1) {
            bootstrapServers = args[0];
            topicName = args[1];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName);

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }
    }
}

30秒おきに、適当にメッセージを30個放り込むProducerです。

起動引数では、「bootstrap.servers」とTopic名をもらうようにしています。
ProducerConfig.METADATA_MAX_AGE_CONFIG(metadata.max.age.ms)については、後述。

Consumer側。
src/main/java/org/littlewings/kafka/SimpleConsumer.java

package org.littlewings.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;
        String groupId;

        if (args.length > 2) {
            bootstrapServers = args[0];
            topicName = args[1];
            groupId = args[2];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
            groupId = "my-group";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName+ ", " + groupId);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "60000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        try (KafkaConsumer<Integer, String> consumer =
             new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList(topicName));

            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(1000L);

                Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator();

                while (iterator.hasNext()) {
                    ConsumerRecord<Integer, String> record = iterator.next();
                    System.out.println(record.partition() + " / "  + record.value());
                }

                TimeUnit.SECONDS.sleep(1L);
            }
        }
    }
}

こちらも、起動引数に「bootstrap.servers」とTopic名、Consumer Group名を受け取るようにしています。起動後は、ひたすら実行を
継続します。

こんな感じのプログラムでスタート。

Brokerを追加してパーティションを追加してみる

では、最初にBrokerを追加してパーティションを追加してみるパターンから。

とりあえず、Apache KafkaのBrokerを2つ起動しておきます。

Topic作成。名前は「my-topic」、パーティション数は3、Replication Factorは2としておきました。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic
Created topic "my-topic".

describe。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3

各BrokerのIDが、3と4になっている(1からとかじゃない)のは気にしないでください…。

ここで、Consumerを3つ起動してみます。Consumer Group名は「my-group」で。

## 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

「bootstrap.servers」には、全部のBrokerを並べなくても良いようなので、今回はこちらを少なく指定してみました。

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

https://kafka.apache.org/11/documentation.html#newconsumerconfigs

これは、Producerにも同じことが書いています。
Producer Configs

Consumerの割り当てを確認。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          0               0               0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          0               0               0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        2          0               0               0               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

はい。

それでは、Producerを起動してみます。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic

30秒ごとにメッセージが30通ずつ送られ

2018-04-15T22:07:29.386 sleeping...

各Consumerがメッセージを受け取ります。

## 1
2 / message-3
2 / message-6
2 / message-9
2 / message-12
2 / message-15
2 / message-18
2 / message-21
2 / message-24
2 / message-27
2 / message-30


## 2
0 / message-2
0 / message-5
0 / message-8
0 / message-11
0 / message-14
0 / message-17
0 / message-20
0 / message-23
0 / message-26
0 / message-29

## 3
1 / message-1
1 / message-4
1 / message-7
1 / message-10
1 / message-13
1 / message-16
1 / message-19
1 / message-22
1 / message-25
1 / message-28

同じConsumer Groupなので、重複なく受け取っていますね。

それでは、ここでBrokerを追加してみます。

$ bin/kafka-server-start.sh -daemon config/server.properties

Brokerを追加しただけではなにも起こらないので、

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          50              50              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          50              50              0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        2          50              50              0               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

パーティションを追加してみましょう。5個まで増やしてみます。
Modifying topics

$ bin/kafka-topics.sh --zookeeper 172.21.0.2:2181 --topic my-topic --alter --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

LeaderやConsumerなどの割り当ては、このように。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 4	Leader: 5	Replicas: 5,3	Isr: 5,3


$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          80              80              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          80              80              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        2          80              80              0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        3          -               0               -               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        4          -               0               -               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

Consumer Group内での際割り当ては、割とすぐに行われます。

ただ、実際のProducerやConsumerが追加されたパーティションに反応するのには、少し時間がかかります。

最終的には、Consumerがちゃんとメッセージを読め、Producerも新しいパーティションにメッセージを送るようになります。

## 1
4 / message-244
4 / message-249
4 / message-254
4 / message-259
4 / message-264
4 / message-269
4 / message-274
4 / message-279
4 / message-284
4 / message-289
4 / message-294
4 / message-299


## 2
0 / message-252
0 / message-257
0 / message-262
0 / message-267
1 / message-250
1 / message-255
1 / message-260
1 / message-265
1 / message-270
1 / message-275
1 / message-280
1 / message-285
1 / message-290
1 / message-295
1 / message-300
0 / message-272
0 / message-277
0 / message-282
0 / message-287
0 / message-292
0 / message-297


## 3
2 / message-248
2 / message-253
2 / message-258
2 / message-263
2 / message-268
3 / message-246
3 / message-251
3 / message-256
3 / message-261
3 / message-266
2 / message-273
2 / message-278
2 / message-283
2 / message-288
2 / message-293
2 / message-298
3 / message-271
3 / message-276
3 / message-281
3 / message-286
3 / message-291
3 / message-296
3 / message-301

この「少し時間がかかる」と言った部分が「metadata.max.age.ms」です。

Producer Configs

New Consumer Configs

Producer、Consumerそれぞれに設定し、ここで設定された時間(ms)だけメタデータを保持するため、指定時間経過後に新しいLeaderやBrokerを検出します。

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

https://kafka.apache.org/11/documentation.html#producerconfigs

デフォルト値は「300000」(5分)なので、けっこう待ちます…。

なので、最初はパーティションを追加したりしても、Produerが新しいパーティションにメッセージを追加してくれなかったり、Consumerが読んでくれなかったり
するように見えてしまいます。

今回は、1分に縮めておきました。

        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

これで、「bootstrap.servers」に記載していないBrokerについてもProducerやConsumerが認識してくれること(あくまで起動時のシードであること)、
後からBrokerを追加(パーティションも追加)しても一定時間後に処理対象として認識してくれることがわかりました。

Brokerを追加して、パーティション配置を再割り当て

では、続いてBrokerを追加して、パーティション配置を再割り当てというパターンを。

ここまでの環境はいったん忘れて、再度Brokerを2つ起動した後から始めます。

Topicの作成。今回は、パーティション数を5とします。Brokerを追加して再割り当てするのみなので。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 5 --topic my-topic
Created topic "my-topic".

Leaderなどのマッピング。

bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,4	Isr: 3,4

先ほどのパターンと同じく、Consumerを3つ起動。Consumer Groupの名前は、「my-group」のままで。

## 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

Consumer Group内での、パーティションとConsumerの割り当てを確認。

$ bash -c 'bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          0               0               0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          0               0               0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          0               0               0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          0               0               0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          0               0               0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

Producerも起動します。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic

Producerによるメッセージの登録と、Consumerのメッセージの読み取りが始まります。

## 1
0 / message-5
0 / message-10
0 / message-15
0 / message-20
0 / message-25
0 / message-30
1 / message-3
1 / message-8
1 / message-13
1 / message-18
1 / message-23
1 / message-28


## 2
4 / message-2
4 / message-7
4 / message-12
4 / message-17
4 / message-22
4 / message-27


## 3
2 / message-1
2 / message-6
2 / message-11
2 / message-16
2 / message-21
2 / message-26
3 / message-4
3 / message-9
3 / message-14
3 / message-19
3 / message-24
3 / message-29

ここでBrokerを追加してみます。

$ bin/kafka-server-start.sh -daemon config/server.properties

この時点では、特に変化はありません。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,4	Isr: 3,4


$bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          24              24              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          24              24              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          24              24              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          24              24              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          24              24              0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

では、Brokerに対するパーティションの再割り当てを行ってみます。
Expanding your cluster

JSONファイルを作成。
topics-to-move.json

{
  "topics": [{ "topic": "my-topic" }],
  "version": 1
}

再分配のための、情報を作成。今回は、「--broker-list」ですべてのBrokerで再割り当てするようにしてみましょう。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list=3,4,5 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[4,3],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[5,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

「Proposed partition reassignment configuration」以下に表示された内容をJSONファイルに保存し
expand-cluster-reassignment.json

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[5,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

実行。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[4,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

再分配が完了。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 5,4	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 3,4	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 4	Replicas: 4,5	Isr: 4,5

Consumer Group内での割り当ては、変わらない感じです。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group'
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          90              90              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          90              90              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          90              90              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          90              90              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          90              90              0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

ところがですね、先ほどのProducerのコードだと

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }

このオペレーションを行うと次の実行でエラーになります。

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError (FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get (FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get (FutureRecordMetadata.java:29)
    at org.littlewings.kafka.SimpleLoader.main (SimpleLoader.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)

この場合、例えばこういう感じとかでメッセージを送ったら接続させ直す方がいいんでしょうか…。
*つなぎ直しているので、これでエラーにはならなくなります、が…

        KafkaProducer<Integer, String> producer =
            new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer());

        for (int i = 1; i < Integer.MAX_VALUE; i++) {
            producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

            if (i % 30 == 0) {
                System.out.println(LocalDateTime.now() + " sleeping...");
                TimeUnit.MINUTES.sleep(1L);

                producer.close();
                producer = new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer());
            }
        }

        producer.close();

今回は、単純にProducerを再起動します。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic'

Consumer側はBrokerの追加および、パーティションの再割り当てを行っても、そのままでメッセージが読めます。

## 1
0 / message-2
0 / message-7
0 / message-12
0 / message-17
0 / message-22
0 / message-27
1 / message-5
1 / message-10
1 / message-15
1 / message-20
1 / message-25
1 / message-30


## 2
4 / message-4
4 / message-9
4 / message-14
4 / message-19
4 / message-24
4 / message-29


## 3
2 / message-3
2 / message-8
2 / message-13
2 / message-18
2 / message-23
2 / message-28
3 / message-1
3 / message-6
3 / message-11
3 / message-16
3 / message-21
3 / message-26

Producer側だけは、ちょっと注意が必要な感じが…。まあ、気楽にパーティションの再割り当てなんて行わない方がよい、という気はしますけどね。

まとめ

Apache Kafkaを使って、ProducerやConsumerの実行途中にBrokerやパーティションの構成を変更してみました。

ProducerやConsumer側に反映されるまでラグがあったり、パーティション再割り当てをした時の挙動などいろいろ知らないことや遭遇したことがあったので、
いい確認にはなったかなと。