Tociyuki::Diary RSSフィード

tociyuki による Perl・Ruby・C++・C で書き散らしたコードを中心に、日常雑記も混在 : B  F  twitter  GitHub  CPAN  本館  公開鍵
 

2017年07月19日

[][]低解像度折れ線グラフの実線と点鎖線の認識の試み

ウェブ・ページに埋め込まれた棒グラフや折れ線グラフからデータを読み取りたいときがたまにあります。 例えば、 東北沖大地震の年の夏、 東京電力管内で電気の使用制限が求められた当初、 東京電力がウェブ・ページに棒グラフを掲載して電力使用量をアナウンスし始めました。 そのとき、 棒グラフの画像から使用量の数値へ変換する処理を書いて公開された方が現れ、 電力使用量 API 登場のニュースになりました。 それからまもなく数値データ表を東京電力のウェブ・ページからダウンロードできるようになり、 様々な電力使用量表示アプリケーションが生まれたものでした。

あの夏の経験から、 ウェブ・ページの作成者は棒グラフや折れ線グラフに合わせて元になる数値データ表をウェブ・ページから提供することはもちろんですが、 過渡期や数値データの提供が期待できないとき棒グラフや折れ線グラフから数値データ表を求める自動処理をウェブ・ページの購読者が欲しくなることがあるとわかります。

ところで、 棒グラフは画像処理のモルフォロジック・フィルタを利用することで数値データ読み取りのための前処理を組み立てることができます。 一方、 折れ線グラフでは、 同一色の一点鎖線と破線を実線から区別したいときがあり棒グラフに比べて難易度が上がります。 折れ線ごとに色分けを最初からしてあれば簡単に数値化できるのですが、 そのようなグラフは相変わらず少数です。 点線の認識へ高解像度画像用に複数の手法が発見されているので、 ウェブ・ページ用の低解像度画像の点線認識に利用できないものかと、 試行錯誤していたのですが、 どうしてもうまくいかず諦めることにしました。 低解像度では点と線の重なりが増え、 繰り返しパターンが壊れがちです。 局所的に点線・一点鎖線・二点鎖線を区別するのが難しくなります。 それどころか、 線の向きの認識が困難になり、 傾きを利用する手法が使えなくなります。 点線認識に傾きを利用する適応性モルフォロジック・フィルタが使いにくいだけでなく、 線と点をベクトル化してから傾きと距離を頼りに点線を組み立て直す手法もうまく動かせないようです。 低解像度画像には別の切り口が必要なようです。

そこで、 妥協して、 線と点の長さと出現パターンを問わず 1 ないし 2 ピクセルの隙間を開けてつながる点鎖線をひっくるめて認識することにしてみました。 実線の折れ線の認識と点鎖線のそれは隙間を考慮するかどうかの違いしかなく、 同じ方式が利用できます。 これで、 実線・点鎖線・破線を区別することがかろうじてできるようになりました。 折れ線同士の重なりがあるので、 最初に実線の折れ線を認識しておき、 認識できなかった残りから点鎖線を認識します。 最後に破線の重なっていない部分が残ります。

f:id:tociyuki:20170719132408p:image

認識を試すために使った画像は、 google 画像検索結果から認識が難しそうと感じた白黒のものを拾い、 画像サイズと解像度を変更せず、 目盛や凡例を塗りつぶしたものを使いました。 認識した実線の折れ線を赤色で、 点鎖線を緑色で上書きしたのが上の画像です。 残念ながら、 破線のピークの重なり位置 2 ヶ所に誤認識が生じています。

低解像度の実線の折れ線は、 Y 軸に平行な縦方向に伸びる幅 1 ピクセルの線分が連なっているものとして認識を左から右へとおこないます。 線分の一方に頭があるものとみなします。 例えるなら、 マッチ棒を縦向きに置いて、 横に並べていったようなものを考えます。 実線の折れ線グラフはペンを使って一筆書きして描きます。 それを考慮するために、 頭が上のときはペンを上へ動かしていることを表し、 下のときはペンを下へ動かしているものとします。 折れ線が重なっているときは、 途中に分岐が生じます。 そのため、 折れ線は右方向への伸び優先で探索していき、 分岐した先の長さを分岐ごとに求め、 最長のものを選ぶことにします。 伸び優先で分岐をそれぞれ辿るため、 試行錯誤が発生します。 そこで、 分岐した箇所からの伸びの長さをメモ化して動的計画法で計算量を抑えることにします。 最長が複数あるときは、 頭に近い方を選ぶことにします。

#include <cstdlib>
#include <iostream>
#include <vector>
#include <array>
#include <opencv2/opencv.hpp>

// 頭付き縦棒 (上の比喩のマッチ棒) を表します。
struct vline_value_type {
    int x, y, rows;     // (x,y) から (x,y+rows-1) までの線分を表します。
    int seg;            // 折れ線認識後の折れ線の種類を表します。
    int head;           // 負なら頭は上側、 正なら頭は下側、 ゼロは頭なしです。
    int len;            // メモ化のための長さの一時記録に使います。
};
// 縦棒をベクタに並べ、 ベクタの添字をマッチ棒の識別子にします。
typedef std::vector<vline_value_type> vline_type;
// x 座標の縦棒の探索のため、 x 座標の先頭の縦棒の位置の索引を作っておきます。
typedef std::vector<int> vcol_type;

// 分岐の開始マッチ棒の識別子と、 そこからの伸びの長さを表します。
// 実線折れ線では一本、 点鎖線では縦の連なりの数本で分岐します。
struct score_value_type {
    int first, second;  // 分岐位置です。 first から second までのマッチ棒の添字。
    int len;            // 伸びの長さです。
};
typedef std::array<score_value_type,6> score_type;

// 伸びのベクタ要素です。
struct path_value_type {
    int offset;         // 
    int head;
};
typedef std::vector<path_value_type> path_type;

void separate_vlines (cv::Mat& img, vcol_type& vcol, vline_type& vlines);
void detect_plot (vcol_type const& vcol, vline_type& vlines, int const gap, int const seg);

void digitize (int first, vcol_type const& vcol, vline_type& vlines, int const gap,
    path_type& path);
void path_append (int const first, int const second, int const head, path_type& path);
int combine_vgap (int& j, vline_type const& vlines, int const gap);
bool fill_score (int const x, int const itop, int const ibot,
    vcol_type const& vcol, vline_type const& vlines, int const gap,
    score_type& score);
void fill_score_len (vcol_type const& vcol, vline_type& vlines, int const gap,
    score_type& score);
void swap_score (score_type& score);
int index_score (score_type const& score);

プログラムのコマンドライン引数に指定した画像を読み取り二値化したら、 縦棒に分解し、 実線の折れ線、 点鎖線の折れ線を認識します。 認識の終わった縦棒を色分けして表示をします。 ここでは、 点鎖線の隙間を 3 ピクセルよりも少ないものとしてハードコードしています。

int
main (int argc, char* argv[])
{
    enum {
        SOLID_GAP = 0, SOLID_SEG = 1,
        DOTTED_GAP = 3, DOTTED_SEG = 2
    };

    if (argc != 2) {
        std::cerr << "usage: " << argv[0] << " IMAGE" << std::endl;
        return EXIT_FAILURE;
    }
    cv::Mat img_original = cv::imread (argv[1], cv::IMREAD_UNCHANGED);
    if (! img_original.data) {
        std::cerr << "error:imread:main:" << argv[1] << std::endl;
        return EXIT_FAILURE;
    }
    cv::Mat img_bin;
    cv::Mat img_gray;
    cv::cvtColor (img_original, img_gray, CV_BGR2GRAY);
    cv::threshold (img_gray, img_bin, 0, 255, cv::THRESH_BINARY|cv::THRESH_OTSU);
    img_bin = ~img_bin;

    vcol_type vcol;
    vline_type vlines;
    separate_vlines (img_bin, vcol, vlines);
    detect_plot (vcol, vlines, SOLID_GAP, SOLID_SEG);
    detect_plot (vcol, vlines, DOTTED_GAP, DOTTED_SEG);

    // 実線を赤色、 点鎖線を緑色に色分けして二値化画像に重ね書きします。
    cv::Mat img_output;
    cv::cvtColor (img_bin, img_output, CV_GRAY2BGR);
    for (int i = 0; i < vlines.size (); ++i) {
        if (vlines[i].seg == 0)
            continue;
        cv::Scalar line_color (0, 0, 255);
        if (vlines[i].seg == DOTTED_SEG) {
            line_color = {0, 255, 0};
        }
        cv::line (img_output, cv::Point (vlines[i].x, vlines[i].y),
            cv::Point (vlines[i].x, vlines[i].y + vlines[i].rows - 1),
            line_color);
    }
    cv::imshow ("IMAGE", img_output);
    cv::waitKey ();

    return EXIT_SUCCESS;
}

縦棒への分解は、 二値化画像を左から右へそれぞれの列ごと調べていき、 列ごとに上から下へと見ていき、 一繋がりの前面色のピクセルをまとめていきます。 一連のピクセルをまとめるやりかたは、 文字列の行から単語を認識するのと同じ方法で記述します。

void
separate_vlines (cv::Mat& img, vcol_type& vcol, vline_type& vlines)
{
    enum { LINEOUT, LINEIN };
    typedef unsigned char U8_T;
    for (int x = 0; x < img.cols; ++x) {
        vcol.push_back (vlines.size ());
        int top;
        for (int y = 0, state = LINEOUT; y <= img.rows; ++y) {
            U8_T const intensity = y >= img.rows ? 0 : img.at<U8_T> (y, x);
            if (intensity != 0) {
                if (state == LINEOUT) {
                    top = y;
                    state = LINEIN;
                }
            }
            else {
                if (state == LINEIN) {
                    vlines.push_back (vline_value_type {x, top, y - top, 0, 0, 0});
                    state = LINEOUT;
                }
            }
        }
    }
}

折れ線を一本認識します。 縦棒を左上から下向きに見ていき折れ線の伸びを digitize 関数で求めます。 長い折れ線の伸び path へ求まったら、 縦棒の seg メンバと head メンバに書き込みます。 digitize では縦棒の len メンバを破壊書き込みするので、 終わりに全縦棒の len メンバをゼロに戻します。

void
detect_plot (vcol_type const& vcol, vline_type& vlines, int const gap, int const seg)
{
    enum { THRESHOLD = 16 };
    path_type path;
    bool got = false;
    for (int x = vlines[0].x; ! got && x < vcol.size () - 1; ++x) {
        for (int i = vcol.at (x); i < vlines.size () && x == vlines.at (i).x; ++i) {
            path.clear ();
            digitize (i, vcol, vlines, gap, path);
            if (path.size () > THRESHOLD) {
                for (path_value_type const& item : path) {
                    vlines[item.offset].seg = seg;
                    vlines[item.offset].head = item.head;
                }
                got = true;
                break;
            }
        }
    }
    for (int i = 0; i < vlines.size (); ++i)
        vlines[i].len = 0;
}

添字が i の縦棒から、 折れ線を右へと伸ばせるだけ伸ばします。 現在の折れ線上の縦棒は添字が itop から ibot まで、 現在の縦棒の頭が上か下かは head 変数で追跡します。 head がゼロのときは頭が決まっていないことを示します。 分岐は 2 段階で求めます。 fill_score は右に隣接する縦棒を調べ、 分岐の起点を score 配列に求めます。 現在の縦棒の頭の上下から score を再配列し、 今度は分岐ごとに fill_score_len で右へ伸ばせる長さを求めます。 最も長く伸ばせる分岐を選んで、 折れ線のパスに追加して、 次の分岐を求めます。

void
digitize (int first, vcol_type const& vcol, vline_type& vlines, int const gap,
    path_type& path)
{
    int const limit = gap ? gap : 2;
    score_type score;
    int second = first;
    int itop = vlines.at (first).y;
    int ibot = combine_vgap (second, vlines, gap);
    int head = 0;
    while (first < vlines.size ()) {
        bool got = false;
        for (int dx = 1; ! got && dx < limit; ++dx) {
            int const x = vlines.at (first).x + dx;
            got = fill_score (x, itop, ibot, vcol, vlines, gap, score);
        }
        if (! got)
            break;
        if (+1 == head)
            swap_score (score);
        fill_score_len (vcol, vlines, gap, score);
        int weight = index_score (score);
        if (weight == score.size ())
            break;
        if (0 == head)
            head = -1;
        path_append (first, second, head, path);
        first = score[weight].first;
        second = score[weight].second;
        if (2 == weight || 3 == weight || 5 == weight)
            head = -head;
        itop = vlines.at (first).y;
        ibot = vlines.at (second).y + vlines.at (second).rows;
    }
    if (path.empty () || path.back ().offset != first) {
        path_append (first, second, head, path);
    }
}

折れ線の経路を記録します。 点鎖線のときは、 上下に複数の縦棒が並ぶときがあります。 そのとき、 頭の位置は端の縦棒だけに記録するようにしています。

void
path_append (int const first, int const second, int const head, path_type& path)
{
    if (head <= 0) {
        path.push_back (path_value_type {first, head});
        for (int k = first + 1; k <= second; ++k) {
            path.push_back (path_value_type {k, 0});
        }
    }
    else {
        for (int k = first; k <= second - 1; ++k) {
            path.push_back (path_value_type {k, 0});
        }
        path.push_back (path_value_type {second, head});
    }
}

点鎖線の隙間を挟んで上下に繋がる一連の縦棒を求めます。 縦棒を並べたベクタの添字 j から始めて、 同じ列で下へと隙間の許容量で飛び越えながら縦棒から縦棒へと移っていきます。 途中、 既に実線の印がついた縦棒があると、 飛び越えをストップします。

int
combine_vgap (int& j, vline_type const& vlines, int const gap)
{
    int const limit = gap ? gap - 1 : 0;
    int const x = vlines.at (j).x;
    int jbot = vlines.at (j).y + vlines.at (j).rows;
    if (limit) {
        for (int k = j + 1; k < vlines.size () && x == vlines.at (k).x; ++k) {
            int dist = vlines.at (k).y - vlines.at (k - 1).y - vlines.at (k - 1).rows;
            if (vlines.at (k).seg || dist > limit)
                break;
            j = k;
            jbot = vlines.at (k).y + vlines.at (k).rows;
        }
    }
    return jbot;
}

縦棒を並べたベクタの添字 itop から ibot の間の縦棒の右に隣接するカラム位置 x の jtop から jbot までの縦棒を求めて、 位置関係から score 配列へ割り振ります。 スコア 0 は、 縦棒の頭の右に隣接する場合、 1 は途中の分岐で最も頭側に隣接する場合、 2 は途中の分岐で最も尾側に隣接する場合、 3 は尾に隣接する場合とします。 スコア配列の先頭側の方が頭に近く隣接していることになります。 頭だけでなく尾側を考慮しなければならないのは、 ペンが折り返す縦棒があるためです。 スコア 4 と 5 は、 例えば頭が上側のとき、 頭の斜め上、尾の斜め下に接する場合で、 隣接の方を優先するため、 別扱いしています。

bool
fill_score (int const x, int const itop, int const ibot,
    vcol_type const& vcol, vline_type const& vlines, int const gap, score_type& score)
{
    bool filled = false;
    score.fill (score_value_type {0, 0, 0});
    for (int j = vcol.at (x); j < vlines.size () && x == vlines.at (j).x; ++j) {
        int const first = j;
        if (vlines.at (first).seg)
            continue;
        int const jtop = vlines.at (first).y;
        int const jbot = combine_vgap (j, vlines, gap);
        if (jbot < itop - gap)
            continue;
        if (ibot + gap < jtop)
            break;

        bool touch = true;
        if (jtop <= itop && itop < jbot)
            score[0] = score_value_type {first, j};
        else if (! score[1].first && itop - gap < jtop && jbot < ibot + gap)
            score[1] = score_value_type {first, j};
        else if (itop - gap < jtop && jbot < ibot + gap)
            score[2] = score_value_type {first, j};
        else if (jtop <= ibot - 1 && ibot - 1 < jbot)
            score[3] = score_value_type {first, j};
        else if (itop - gap <= jbot && jbot <= itop)
            score[4] = score_value_type {first, j};
        else if (ibot <= jtop && jtop <= ibot + gap)
            score[5] = score_value_type {first, j};
        else
            touch = false;

        if (touch)
            filled = true;
    }
    return filled;
}

動的計画法で、 分岐ごとに右へ折れ線を伸ばせる長さを求めます。 既に長さが記録してあるときは、 それを返します。

void
fill_score_len (vcol_type const& vcol, vline_type& vlines, int const gap,
    score_type& score)
{
    for (int w = 0; w < score.size (); ++w) {
        if (score[w].first) {
            if (! vlines[score[w].first].len) {
                path_type tail;
                digitize (score[w].first, vcol, vlines, gap, tail);
                vlines[score[w].first].len = tail.size ();
            }
            score[w].len = vlines[score[w].first].len;
        }
    }
}

頭が下側のとき、 スコア配列の上下関係はひっくり返します。

void
swap_score (score_type& score)
{
    std::swap (score[0], score[3]);
    std::swap (score[1], score[2]);
    std::swap (score[4], score[5]);
}

スコア配列から適切と思われる分岐を一つ選びます。 右へ最も長く伸ばせる分岐であること、 同じ長さに伸ばせるときは頭に近い方であること、 が判定基準です。

int
index_score (score_type const& score)
{
    int max_len = 0;
    int max_len_at = score.size ();
    for (int k = 0; k < score.size (); ++k) {
        if (score[k].first != 0) {
            if (max_len < score[k].len) {
                max_len = score[k].len;
                max_len_at = k;
            }
        }
    }
    return max_len_at;
}

2017年06月21日

[]閉包表による木構造

SQL で木構造を扱う方法の一つ、 閉包表を SQLite3 をターゲットとして試してみます。 閉包表 (Closure Table Tree Encoding) は、 ノードの主キーと、 そのノードを根とする部分木中の主キーの集合を関連付けます。 この関連付けを逆写像で眺めると、 経路列挙をカラムに文字列で格納するのではなく、 別の多対多の表に切り出したものになります。 そのため、 入れ子集合、 経路列挙の 2 つの視点から木構造を利用できるようになっています。 閉包表の名称でこの手法の利用例が説明されだしたのは 2010 年ぐらいからです。

参考

The simplest(?) way to do tree-based queries in SQL (dirtSimple.org)

Moving Subtrees in Closure Table Hierarchies - Percona Database Performance Blog

SQLで木と階層構造のデータを扱う(2)―― 経路列挙モデル

入れ子集合と経路の 2 面性を扱える閉包表ですけど、 今回は主に経路列挙の視点から木を扱ってみます。 閉包表で経路を列挙するときは、 葉から根へ向けて、 言い換えると子孫から祖先へ向けるやりかたが主流になっています。 閉包表には、 着目しているノードの主キー descendant に対して、 そのノードから根までの経路の途中のノードの主キー ancestor を列挙します。 閉包表は入れ子集合でもあるので、 経路の順番を閉包表に記入しなくてもその都度計算可能ですけど、 それならば最初から経路の順番を depth に書き込む方が賢いやりかただと納得できます。 もっとも、 その代償で、 削除と挿入時に depth をメンテナンスするやりかたを考えるはめになります。 この文書の目的の一つは depth の素朴なメンテナンスのやりかたの備忘録を残しておくことでもあります。

CREATE TABLE IF NOT EXISTS entries (
    id INTEGER NOT NULL UNIQUE PRIMARY KEY
   ,body TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS closure (
    descendant INTEGER NOT NULL REFERENCES entries (id)
   ,ancestor INTEGER NOT NULL REFERENCES entries (id)
   ,depth INTEGER NOT NULL CHECK (depth >= 0)
   ,PRIMARY KEY (descendant ASC, ancestor ASC)
);

主キーによる経路列挙と同じで、 経路には根の主キーとノードの主キーまでを列挙します。 depth は葉をゼロ、 根を深さになるように逆順に付けておくのが通例になっています。 逆順なのは、 ある主キーの直下にある子の主キーをリストアップするとき depth が 1 の行で探せば良いためです。

BEGIN TRANSACTION;

-- 主キーが 1 のエントリを経路 /1/ で作ります
INSERT INTO entries (id, body) VALUES (1, 'A');
INSERT INTO closure (descendant, ancestor, depth) VALUES (1, 1, 0);
-- 主キーが 2 のエントリを経路 /1/2/ で作ります
INSERT INTO entries (id, body) VALUES (2, 'B');
INSERT INTO closure (descendant, ancestor, depth) VALUES (2, 1, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (2, 2, 0);
-- 主キーが 3 のエントリを経路 /1/3/ で作ります
INSERT INTO entries (id, body) VALUES (3, 'C');
INSERT INTO closure (descendant, ancestor, depth) VALUES (3, 1, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (3, 3, 0);
-- 主キーが 4 のエントリを経路 /1/3/4/ で作ります
INSERT INTO entries (id, body) VALUES (4, 'D');
INSERT INTO closure (descendant, ancestor, depth) VALUES (4, 1, 2);
INSERT INTO closure (descendant, ancestor, depth) VALUES (4, 3, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (4, 4, 0);
-- 主キーが 5 のエントリを経路 /1/3/5/ で作ります
INSERT INTO entries (id, body) VALUES (5, 'E');
INSERT INTO closure (descendant, ancestor, depth) VALUES (5, 1, 2);
INSERT INTO closure (descendant, ancestor, depth) VALUES (5, 3, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (5, 5, 0);
-- 主キーが 6 のエントリを経路 /1/3/6/ で作ります
INSERT INTO entries (id, body) VALUES (6, 'F');
INSERT INTO closure (descendant, ancestor, depth) VALUES (6, 1, 2);
INSERT INTO closure (descendant, ancestor, depth) VALUES (6, 3, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (6, 6, 0);
-- 主キーが 7 のエントリを経路 /1/3/4/7/ で作ります
INSERT INTO entries (id, body) VALUES (7, 'G');
INSERT INTO closure (descendant, ancestor, depth) VALUES (7, 1, 3);
INSERT INTO closure (descendant, ancestor, depth) VALUES (7, 3, 2);
INSERT INTO closure (descendant, ancestor, depth) VALUES (7, 4, 1);
INSERT INTO closure (descendant, ancestor, depth) VALUES (7, 7, 0);

COMMIT;

ルートを見つけるには、 主キーを 1 に決め打ちしていると楽ですが、 必ずしもそうでないときは経路の先頭にしか現れないことを利用します。

SELECT *
  FROM entries
  WHERE 1 == (SELECT COUNT(*) FROM closure WHERE descendant = id);

-- => 1|A

葉を探すときは、 経路の末尾にしか現れないことを利用します。

SELECT * 
  FROM entries
  WHERE 1 == (SELECT COUNT(*) FROM closure WHERE ancestor = id);

-- => 2|B
--    5|E
--    6|F
--    7|G

descendant でグループを作ることで、 ノードの深さと経路を列挙することができます。 SQLite3 の方言 GROUP_CONCAT 集計関数は、 本稿執筆時点の最新版 3.19.3 でも ancestor の順番を depth を使って並べ替えることができないので、 これまた SQLite3 の方言である FROM SELECT 句を使って depth の降順で並べ替えてからグループ化をおこないます。 GROUP_CONCAT 集計関数に相当する方言に、 Oracle は LIST_AGG、 MySQL は group_concat、 PostgreSQL は string_agg をそれぞれ持っています。 SQLite3 以外では、 ORDER BY を関数の引数に修飾可能で、 下のような FROM にサブクエリを指定する非標準記述は不要です。

SELECT id, body
      ,MAX(depth) AS lev
      ,GROUP_CONCAT(ancestor) AS path
    FROM (SELECT id, body, descendant, ancestor, depth
            FROM entries JOIN closure ON descendant = id
            ORDER BY depth DESC)
    GROUP BY descendant
    ORDER BY path;

-- => 1|A|0|1
--    2|B|1|1,2
--    3|C|1|1,3
--    4|D|2|1,3,4
--    7|G|3|1,3,4,7
--    5|E|2|1,3,5
--    6|F|2|1,3,6

ノードの深さと quote zeroblob ハック を使ってインデント表現もできなくはありません。 制限として、 path を文字列順に並べているので、 桁数の異なる id が混在しているときは、 インデント表示が崩れることがあります。

--      quote(zeroblob(1))         => X'00'
-- trim(quote(zeroblob(1)), 'X''') => 00
SELECT replace(trim(quote(zeroblob(lev)), 'X'''), '00', '  ') || body
    FROM (SELECT id, body
          ,MAX(depth) AS lev
          ,GROUP_CONCAT(ancestor) AS path
        FROM (SELECT id, body, descendant, ancestor, depth
                FROM entries JOIN closure ON descendant = id
                ORDER BY depth DESC)
        GROUP BY descendant
        ORDER BY path);

-- => A
--      B
--      C
--        D
--          G
--        E
--        F

直属の親から見た子を取得するには、 depth が 1 になっている閉包表の行を利用します。 子供がない親では、 子のカラムには NULL が返ります。

SELECT P.id, P.body, C.id, C.body
    FROM entries AS P LEFT OUTER JOIN entries AS C
        ON P.id == (SELECT ancestor
                        FROM closure
                        WHERE descendant = C.id AND depth = 1);

-- => 1|A|2|B
--    1|A|3|C
--    2|B||
--    3|C|4|D
--    3|C|5|E
--    3|C|6|F
--    4|D|7|G
--    5|E||
--    6|F||
--    7|G||

直属の子から見た親を取得するには、 外部結合をひっくり返します。 根の親は NULL になります。

SELECT C.id, C.body, P.id, P.body
    FROM entries AS C LEFT OUTER JOIN entries AS P
        ON P.id == (SELECT ancestor
                        FROM closure
                        WHERE descendant = C.id AND depth = 1);

-- => 1|A||
--    2|B|1|A
--    3|C|1|A
--    4|D|3|C
--    5|E|3|C
--    6|F|3|C
--    7|G|4|D

部分木のノードを列挙するには、 閉包表は部分木そのものでもあるので、 表示の仕方に細工をするだけです。

SELECT R.id, R.body, C.id, C.body
    FROM entries AS R JOIN entries AS C JOIN closure
        ON ancestor = R.id AND descendant = C.id
    WHERE R.id != C.id
    ORDER BY R.id;

-- => 1|A|2|B
--    1|A|3|C
--    1|A|4|D
--    1|A|5|E
--    1|A|6|F
--    1|A|7|G
--    3|C|4|D
--    3|C|5|E
--    3|C|6|F
--    3|C|7|G
--    4|D|7|G

葉を削除するときは、 閉包の descendant が主キーに一致する行を削除します。

BEGIN TRANSACTION;

-- 葉 5|E|2|1,3,5 を削除します。
DELETE FROM closure WHERE descendant = 5;
DELETE FROM entries WHERE id = 5;

-- => 1|A|0|1
--    2|B|1|1,2
--    3|C|1|1,3
--    4|D|2|1,3,4
--    7|G|3|1,3,4,7
--    6|F|2|1,3,6

--COMMIT;
ROLLBACK;

閉包表は部分木を表しているので、 部分木を葉までごっそりと削除するのは簡単です。 例えば、 4 を削除すると 7 も一緒に削除します。 結果は経路列挙表示にしています。

BEGIN TRANSACTION;

-- ノード 4|D|2|1,3,4 と
--     葉 7|G|3|1,3,4,7 を同時に削除します。

DELETE FROM entries
    WHERE id IN (SELECT descendant FROM closure WHERE ancestor = 4);

DELETE FROM closure
    WHERE descendant IN (SELECT descendant FROM closure WHERE ancestor = 4);

-- => 1|A|0|1
--    2|B|1|1,2
--    3|C|1|1,3
--    5|E|2|1,3,5
--    6|F|2|1,3,6

--COMMIT;
ROLLBACK;

木の途中のノードを削除して中抜きするとき、 経路情報の depth を更新するのに一手間かかります。 更新さえしてしまえば、 閉包表からの削除は単純作業です。 例えば、 祖父母を親代わりにするには、 根から祖父母のノードまでの経路の depth をすべて 1 減じます。 なお、 下の UPDATE 文が MySQL でエラーになるときは、 「3 つの closure 表の自己結合」を使うことで動かすことができます。

BEGIN TRANSACTION;

-- ノード 3|C|1|1,3 を中抜きします。
-- closure VALUES (3, 1, 1) => delete
-- closure VALUES (3, 3, 0) => delete
--
-- closure VALUES (4, 1, 2) => closure VALUES (4, 1, 1) depth を 1 減じます。
-- closure VALUES (4, 3, 1) => delete
-- closure VALUES (4, 4, 0) 変更なし
--
-- closure VALUES (7, 1, 3) => closure VALUES (7, 1, 2) depth を 1 減じます。
-- closure VALUES (7, 3, 2) => delete
-- closure VALUES (7, 4, 1) 変更なし
-- closure VALUES (7, 7, 0) 変更なし
-- 等

UPDATE closure
    SET depth = depth - 1
    WHERE descendant IN (SELECT descendant FROM closure WHERE ancestor = 3)
      AND ancestor NOT IN (SELECT descendant FROM closure WHERE ancestor = 3);

DELETE FROM closure
    WHERE descendant = 3 OR ancestor = 3;

DELETE FROM entries
    WHERE id = 3;

-- => 1|A|0|1
--    2|B|1|1,2
--    4|D|1|1,4
--    7|G|2|1,4,7
--    5|E|1|1,5
--    6|F|1|1,6

--COMMIT;
ROLLBACK;

親 3 を削除して、 代理で子供の一人 4 が親代わりになるときは、 4 の部分木を一段上へ移動します。 移動したら 3 を削除し、 ancestor が 3 のまま残っている閉包表の行を 4 へ書き直します。

BEGIN TRANSACTION;

-- 部分木 4|D|2|1,3,4 を 4|D|1|1,4 に移動します。
-- closure VALUES (4, 1, 2) => delete => insert => closure VALUES (4, 1, 1)
-- closure VALUES (4, 3, 1) => delete
-- closure VALUES (4, 4, 0) 変更なし
--
-- closure VALUES (7, 1, 3) => delete => insert => closure VALUES (7, 1, 2)
-- closure VALUES (7, 3, 2) => delete
-- closure VALUES (7, 4, 1) 変更なし
-- closure VALUES (7, 7, 0) 変更なし
DELETE FROM closure
    WHERE descendant IN (SELECT descendant FROM closure WHERE ancestor = 4)
      AND ancestor NOT IN (SELECT descendant FROM closure WHERE ancestor = 4);

INSERT INTO closure
    SELECT D.descendant, A.ancestor, A.depth + D.depth + 1
        FROM closure AS A JOIN closure AS D
        WHERE A.descendant = (SELECT ancestor FROM closure WHERE descendant = 3 AND depth = 1)
          AND D.ancestor = 4;

-- 3|C|1|1,3 を削除します。
DELETE FROM closure
    WHERE descendant = 3;

-- 5|E|2|1,3,5 を 5|E|2|1,4,5 へ
-- 6|F|2|1,3,6 を 6|F|2|1,4,6 へ書き直します。
UPDATE closure
    SET ancestor = 4
    WHERE ancestor = 3;

-- => 1|A|0|1
--    2|B|1|1,2
--    4|D|1|1,4
--    5|E|2|1,4,5
--    6|F|2|1,4,6
--    7|G|2|1,4,7

--COMMIT;
ROLLBACK;

葉を追加するとき、 閉包表の親から根までの ancestor を受け継いで、 depth に 1 を加えた行を追加します。 さらに葉自身の行を閉包表へ追加します。

BEGIN TRANSACTION;

-- 2|B|1|1,2 の直下に葉 8|H|2|1,2,8 を追加します。
INSERT INTO entries VALUES (8, 'H');
INSERT INTO closure
    SELECT 8, ancestor, depth + 1 FROM closure WHERE descendant = 2
    UNION ALL
    SELECT 8, 8, 0;

-- => 1|A|0|1
--    2|B|1|1,2
--    8|H|2|1,2,8
--    3|C|1|1,3
--    4|D|2|1,3,4
--    7|G|3|1,3,4,7
--    5|E|2|1,3,5
--    6|F|2|1,3,6

--COMMIT;
ROLLBACK;

経路の途中にノードを追加するときは、 まず葉として追加します。 例えばノード 4 とノード 6 の直上にノード 8 を挿入するときは、 4 の直上の親に葉 8 を追加します。 その後のやりかたはいくつか考えられますが、 ここでは使いまわしが効く方法を採用し、 ノード 4 の部分木をノード 8 の下へ移動することにします。 続いて、 ノード 6 の部分木もノード 8 の下へ移動します。 ノード 4 の部分木を移動するには、 ノード 4 を ancestor に含むすべての経路に対して、 経路の先頭からノード 4 の直前までを削除してから、 ノード 8 の直下に根までの経路を作り直します。

BEGIN TRANSACTION;

-- 4|E|2|1,3,4 と 6|F|2|1,3,6 の上に 8|H を追加します。
INSERT INTO entries VALUES (8, 'H');
INSERT INTO closure
    SELECT 8, ancestor, depth + 1
        FROM closure
        WHERE descendant = (SELECT ancestor FROM closure WHERE descendant = 4 AND depth = 1)
    UNION ALL
    SELECT 8, 8, 0;

-- 4|E|2|1,3,4 をノード 3 の直下から、 ノード 8 の直下へ移動して 4|E|3|1,3,8,4| に作り直します。
-- そのとき同時に 7|G|3|1,3,4,7 もノード 8 の下へ移動させて 7|G|4|1,3,8,4,7 にします。
DELETE FROM closure
    WHERE descendant IN (SELECT descendant FROM closure WHERE ancestor = 4)
      AND ancestor NOT IN (SELECT descendant FROM closure WHERE ancestor = 4);

INSERT INTO closure
    SELECT D.descendant, A.ancestor, A.depth + D.depth + 1
        FROM closure AS A JOIN closure AS D
        WHERE A.descendant = 8
          AND D.ancestor = 4;

-- 6|E|2|1,3,6 をノード 3 の直下から、 ノード 8 の直下へ移動して 6|E|3|1,3,8,6| に作り直します。
DELETE FROM closure
    WHERE descendant IN (SELECT descendant FROM closure WHERE ancestor = 6)
      AND ancestor NOT IN (SELECT descendant FROM closure WHERE ancestor = 6);

INSERT INTO closure
    SELECT D.descendant, A.ancestor, A.depth + D.depth + 1
        FROM closure AS A JOIN closure AS D
        WHERE A.descendant = 8
          AND D.ancestor = 6;

-- => 1|A|0|1
--    2|B|1|1,2
--    3|C|1|1,3
--    5|E|3|1,3,5
--    8|H|2|1,3,8
--    4|D|2|1,3,8,4
--    7|G|3|1,3,8,4,7
--    6|F|3|1,3,8,6

--COMMIT;
ROLLBACK;

2017年06月16日

[]メッセージ・ヘッダ風のフロントマター

Markdown の前にブログ生成等のふるまいを記述するフロントマターには、 YAML をはじめとして、 いろんな形式が使われていますが、 メッセージ・ヘッダ風の簡略なものでも良いのではないかとスキャナを書いてみました。 YAML はスラッシュ 3 つで区切りますが、 ヘッダ風ではスラッシュ 2 つで区切ることにします。 MIME マルチパートの区切りがスラッシュ 2 つで始まることと、 一見 YAML のように見えることにひっかけてあります。

--
layout : default
uri    : frontmatter/index.html
--
メッセージ・ヘッダ風のフロントマター
====================================

一見 YAML フロトマターのように見えますが、 YAML ではありません。

スキャンに成功すると、 m_header メンバに名前と値を格納します。 m_size メンバはフロントマターのバイト数で、 この分をスキップすると本体の Markdown を取り出すことができます。

#include <map>
#include <string>

struct frontmatter_type {
    std::map<std::string,std::string> m_header;
    std::size_t m_size;

    bool scan (std::string const& src);
};

フラットなデータ記述なので、 まず、 Perl 互換正規表現で記述します。 通常のメッセージ・ヘッダとは異なり、 区切り記号を使っているので、 空行があっても良いことにしておきます。

qr{
    --\n+
    (?:[A-Za-z0-9][A-Za-z0-9_.~-]*[ ]*:[ ]*[^\n]*\n
       (?:(?:[ ]+[^\n]*)?\n)* )*
    --\n
}msx;

これから遷移表を作ります。 開始状態は S2 です。

S2 : '-' S3
S3 : '-' S4
S4 : "\n" S5
S5 : [A-Za-z0-9] S6 | "\n" S5
S6 : [A-Za-z0-9_.~-] S6 | ' ' S7 | ':' S8
S7 : ' ' S7 | ':' S8
S8 : [\x21-0xff] S9 | "\n" Sa | ' ' S8
S9 : [\x20-0xff] S9 | "\n" Sa
Sa : '-' Sb | [A-Za-z0-9] S6 | "\n" Sa | ' ' S9
Sb : '-' Sc
Sc : "\n" Sd
Sd : MATCH

遷移表から BASE-CHECK 配列 RULE を作成し、 m_headerm_size へ書き込むアクションを追加します。 RULE は 16 進数 3 桁の並びで、 桁は上からアクション番号、 遷移先状態番号、 遷移元状態番号になっています。 遷移元状態番号が state に一致すると状態遷移することができます。

static inline unsigned int
rule_exist (unsigned int const rule[], int const nrule, int const state, int const i)
{
    return 0 <= i && i < nrule && (rule[i] & 0x00f) == state;
}

bool
frontmatter_type::scan (std::string const& src)
{
    enum { NRULE = 31 };
    static const char CODE[] =
      // @ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_ !"#$%&'()*+,-./0123456789:;<=>?
        "@@@@@@@@@@D@@@@@@@@@@@@@@@@@@@@@E@@@@@@@@@@@@BG@CCCCCCCCCCF@@@@@"
      // @ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~
        "@CCCCCCCCCCCCCCCCCCCCCCCCCC@@@@G@CCCCCCCCCCCCCCCCCCCCCCCCCC@@@G";
    static const int BASE[] = {
        0-2, 1-2, 2-4, 3-3, 5-2, 11-5, 13-2, 19-2, 25-2, 7-2, 29-4, 30-1
    };
    static const unsigned int RULE[NRULE] = {
        0x032, 0x043, 0x054, 0x165, 0x155, 0x266, 0x266, 0x0cb, 0x276, 0x286,
        0x266, 0x077, 0x087, 0x398, 0x398, 0x3a8, 0x388, 0x398, 0x398, 0x499,
        0x499, 0x4a9, 0x499, 0x499, 0x499, 0x5ba, 0x56a, 0x5aa, 0x59a, 0x0dc,
        0x61d,
    };
    m_header.clear ();
    std::size_t h_first = 0, h_second = 0;
    std::size_t v_first = 0, v_second = 0;
    int state = 2;
    for (std::size_t pos = 0; 1 < state && pos < src.size (); ++pos) {
        int const ch = static_cast<unsigned char> (src[pos]);
        int const code = ((8 == state || 9 == state) && ' ' < ch) ? 3 : ch < 127 ? CODE[ch] - '@' : 0;
        int const jump = BASE[state - 2] + code;
        int const match = BASE[state - 2] + 1;
        unsigned int const rule = rule_exist (RULE, NRULE, state, jump)  ? RULE[jump]
                                : rule_exist (RULE, NRULE, state, match) ? RULE[match]
                                : 0;
        state = (rule & 0x0f0) >> 4;
        switch ((rule & 0xf00) >> 8) {
        case 1:
            h_first = h_second = pos;
            break;
        case 2:
            h_second = pos;
            break;
        case 3:
            v_first = v_second = pos;
            break;
        case 4:
            if (' ' < ch)
                v_second = pos + 1;
            break;
        case 5:
            if (' ' < ch) {
                std::string h = src.substr (h_first, h_second - h_first);
                m_header[h] = src.substr (v_first, v_second - v_first);
                h_first = h_second = pos;
            }
            break;
        case 6:
            m_size = pos;
            break;
        }
    }
    if (1 != state) {
        m_header.clear ();
        m_size = 0;
    }
    return 1 == state;
}