Akka Typed JavaDSLでConcurrent Theta Sketchする

Akka Typed JavaDSLとDataSketchesをとりあえず使ってみるだけの備忘録。DataSketchesの中でもTheta Sketch Frameworkを使用する。今回やっていることは以下とほぼ同じ。

Concurrent Theta Sketchは自力で書かなくても UploadSketchBuilder#buildSharedUploadSketchBuilder#buildLocalが公式に提供されているので、実際にやる場合はこちらを使うべき。

まず、gradle経由でakka typedを使うには、最低限scalaプラグインを適用してやる必要がある。更に適当なAkka Typedの依存と、DataSketchesの依存を追加する。

plugins {
    id 'java'
    id 'scala'
}
dependencies {
    compile group: 'org.apache.datasketches', name: 'datasketches-java', version: '1.2.0-incubating'
    compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.13', version: '2.6.4'
}

ローカルスケッチを更新するためのSketchActorを用意。単一のローカルスケッチをスレッドセーフに更新する。また、定期的にコンパクションしたローカルスケッチを、後述するSketchUnionActorに送信する。

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
import org.apache.datasketches.theta.UpdateSketch;

import java.time.Duration;

public class SketchActor extends AbstractBehavior<SketchActor.Protocol> {

    public interface Protocol {}

    public static final class SketchEvent implements Protocol {
        private final long value;
        public SketchEvent(long value) {
            this.value = value;
        }
        public long getValue() {
            return value;
        }
    }

    private static final class PersistRequest implements Protocol {
        private PersistRequest() {}
        private final static PersistRequest instance = new PersistRequest();
        public static PersistRequest getInstance() {
            return instance;
        }
    }

    private final UpdateSketch updateSketch;
    private final ActorRef<SketchUnionActor.UnionSketch> unionActorRef;

    private SketchActor(
            ActorContext<Protocol> context,
            UpdateSketch updateSketch,
            ActorRef<SketchUnionActor.UnionSketch> unionActorRef) {
        super(context);
        this.updateSketch = updateSketch;
        this.unionActorRef = unionActorRef;
    }

    public static Behavior<Protocol> create(
            ActorRef<SketchUnionActor.UnionSketch> unionActorRef,
            Duration unionInitialDuration, Duration unionRate) {
        return Behaviors.setup(ctx -> {
            ctx.getSystem().scheduler()
                    .scheduleAtFixedRate(
                            unionInitialDuration, unionRate,
                            () -> ctx.getSelf().tell(PersistRequest.getInstance()),
                            ctx.getSystem().executionContext());

            return new SketchActor(ctx, UpdateSketch.builder().build(), unionActorRef);
        });
    }

    @Override
    public Receive<Protocol> createReceive() {
        return newReceiveBuilder()
                .onMessage(SketchEvent.class, this::onSketchEvent)
                .onMessage(PersistRequest.class, this::onPersist)
                .build();
    }

    private Behavior<Protocol> onSketchEvent(SketchEvent event) {
        updateSketch.update(event.getValue());
        return Behaviors.same();
    }

    private Behavior<Protocol> onPersist(PersistRequest persist) {
        unionActorRef.tell(new SketchUnionActor.UnionSketch(updateSketch.compact()));
        return Behaviors.same();
    }

}

SketchActorからコンパクションされたスケッチを受け取り、UnionSketchにマージしていく。定期的にスナップショットをファイルとして公開している。

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
import org.apache.datasketches.theta.*;

import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;

public class SketchUnionActor extends AbstractBehavior<SketchUnionActor.Protocol> {

    public interface Protocol {}

    public static final class UnionSketch implements Protocol {
        private final Sketch sketch;
        public UnionSketch(Sketch sketch) {
            this.sketch = sketch;
        }
        public Sketch getSketch() {
            return sketch;
        }
    }

    private static final class SnapshotRequest implements Protocol {
        private SnapshotRequest() {}
        private final static SnapshotRequest instance = new SnapshotRequest();
        public static SnapshotRequest getInstance() {
            return instance;
        }
    }

    private final Union union;

    private SketchUnionActor(ActorContext<Protocol> context) {
        super(context);
        union = SetOperation.builder().buildUnion();
    }

    public static Behavior<Protocol> create(Duration snapshotInitialDuration, Duration snapshotRate) {
        return Behaviors.setup(ctx -> {
            ctx.getSystem().scheduler()
                    .scheduleAtFixedRate(
                            snapshotInitialDuration, snapshotRate,
                            () -> ctx.getSelf().tell(SnapshotRequest.getInstance()),
                            ctx.getSystem().executionContext());

            return new SketchUnionActor(ctx);
        });
    }

    @Override
    public Receive<Protocol> createReceive() {
        return newReceiveBuilder()
                .onMessage(UnionSketch.class, this::onUnionSketch)
                .onMessage(SnapshotRequest.class, this::onSnapshotRequest)
                .build();
    }

    private Behavior<Protocol> onUnionSketch(UnionSketch unionSketch) {
        union.update(unionSketch.getSketch());
        return Behaviors.same();
    }

    private Behavior<Protocol> onSnapshotRequest(SnapshotRequest snapshotRequest) {
        final var compacted = union.getResult();
        final var snapshot = compacted.toByteArray();

        try (final var out = new FileOutputStream("snapshot.bin")) {
            System.out.println("snapshot start.");
            out.write(snapshot);
            System.out.println("snapshot end.");
            System.out.println(compacted.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }

        return Behaviors.same();
    }

}

main関数。Actorを適当にセットアップして、適当に値を送りつける。

import java.time.Duration;
import java.util.stream.IntStream;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

public final class Main {

    public static void main(String[] args) {
        final var guardian = Behaviors.setup(ctx -> {
            final var union = ctx.spawn(SketchUnionActor.create(
                    Duration.ofSeconds(180), Duration.ofSeconds(180)),
                    "union").<SketchUnionActor.UnionSketch>narrow();
            final var sketch1 = ctx.spawn(
                    SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch1");
            final var sketch2 = ctx.spawn(
                    SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch2");

            IntStream.range(0, 100000).forEach(i -> sketch1.tell(new SketchActor.SketchEvent(i)));
            IntStream.range(50000, 150000).forEach(i -> sketch2.tell(new SketchActor.SketchEvent(i)));

            return new AbstractBehavior<Object>(ctx) {
                @Override
                public Receive<Object> createReceive() {
                    return newReceiveBuilder().build();
                }
            };
        });

        ActorSystem.create(guardian, "system");
    }

}

上のコードだと、30秒間間隔で各SketchActorがSketchUnionActorにコンパクションされたSketchを送り、180秒間間隔でSketchUnionActorがスナップショットを公開する。以下は実行後180秒ほど経過した時のstdout。

snapshot start.
snapshot end.

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 149586.73149344584
   Upper Bound, 95% conf   : 154287.5017892762
   Lower Bound, 95% conf   : 145028.6046846571
   Theta (double)          : 0.027382107751846067
   Theta (long)            : 252555366948521403
   Theta (long) hex        : 038141c4a515c5bb
   EstMode?                : true
   Empty?                  : false
   Retained Entries        : 4096
   Seed Hash               : 93cc | 37836
### END SKETCH SUMMARY

Akka JavaDSL、案外ScalaDSLと差異は少なく、使いやすそう。

HBaseをローカル分散モードで立ち上げてJavaから繋ぐまで

好き勝手やるためにローカル分散モードで立ち上げてJavaのHBase Clientから繋ぐまでやる。

  • HBaseローカル分散モードでコンテナ起動

GitHub - big-data-europe/docker-hbase

> git clone https://github.com/big-data-europe/docker-hbase.git
> docker-compose -f docker-compose-distributed-local.yml up -d
  • HBaseマスタのコンテナにログイン。使うテーブルを適当に作る。
> docker-compose -f docker-compose-distributed-local.yml exec hbase-master bash
> root@hbase-master:/# hbase shell

hbase(main):001:0> list
TABLE                                                                                
0 row(s) in 0.1540 seconds

=> []
hbase(main):002:0> create 'source_table', {NAME=>'e',VERSIONS=>10}
0 row(s) in 1.3220 seconds

=> Hbase::Table - source_table

hbase(main):005:0* scan 'source_table'
ROW                    COLUMN+CELL                                                   
0 row(s) in 0.0860 seconds
  • Gradleのdependenciesに追加。HBaseのバージョンが1.2.6なのでそれに準ずるバージョンを指定。
dependencies {
    ...
    compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.6'
    compile group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.6'
}
  • docker-composeの外側からHBaseに接続して適当なデータをPutするコード。
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.ServiceException;

public final class ExampleReadWrite {

    public static void main(String[] args) throws IOException {
        final var config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "localhost");
        config.set("hbase.zookeeper.property.clientPort","2181");

        try (final var conn = ConnectionFactory.createConnection(config)) {
            HBaseAdmin.checkHBaseAvailable(config);
            final var sourceTableName = Bytes.toBytes("source_table");

            final var put = new Put(Bytes.toBytes("z"));
            put.addColumn(Bytes.toBytes("e"), Bytes.toBytes("c"), Bytes.toBytes(20));
            put.addColumn(Bytes.toBytes("e"), Bytes.toBytes("c"), Bytes.toBytes(10));

            final var sourceTable = conn.getTable(TableName.valueOf(sourceTableName));
            sourceTable.put(put);

            sourceTable.close();
        } catch (IOException | ServiceException e) {
            System.out.println(e);
        }
    }

}
# for docker-hbase
127.0.0.1       hbase-master
127.0.0.1       hbase-region
  • また別のエラー。org.apache.hadoop.hbase.MasterNotRunningException: com.google.protobuf.ServiceException: java.net.ConnectException: Connection refusedがでる。ローカルから接続する際にdocker-compose-distributed-local.ymlport設定が足りていないので以下のように編集。(16000, 16020を追加する)
...
  hbase-master:
    image: bde2020/hbase-master:1.0.0-hbase1.2.6
    container_name: hbase-master
    hostname: hbase-master
    env_file:
      - ./hbase-distributed-local.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075 zoo:2181"
    ports:
      - 16000:16000
      - 16010:16010
...
  hbase-region:
    image: bde2020/hbase-regionserver:1.0.0-hbase1.2.6
    container_name: hbase-regionserver
    hostname: hbase-regionserver
    env_file:
      - ./hbase-distributed-local.env
    environment:
      HBASE_CONF_hbase_regionserver_hostname: hbase-region
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075 zoo:2181 hbase-master:16010"
    ports:
      - 16020:16020
      - 16030:16030
  • コードを実行するとようやく最後までrunされる。
hbase(main):006:0> scan 'source_table'
ROW                    COLUMN+CELL                                                   
 z                     column=e:c, timestamp=1582633454479, value=\x00\x00\x00\x0A   
1 row(s) in 0.0260 seconds

Javaで並行処理するのにサクッと使えるCompletableFuture

ScalaならFutureでも十分使いやすいのであまり困ることはないが、JavaFutureScalaFutureとは異なり使いにくい(個人的な感想)。 似たような方法で簡単に並行処理を行える方法はないか?と調べたところ、CompletableFutureなるものがあるのを見つけた。1.8から導入されたっぽい。

CompletableFuture#completedFutureCompletableFuture#failedFutureで実行完了済みのCompletableFutureを返すことができる。 値を返さないものはCompletableFuture#runAsyncRunnableを渡せばいいし、CompletableFuture#supplyAsyncSupplierを渡してthenXXXで処理を繋げることもできる。 CompletableFuture#allOfを使うと、複数のタスクを待ち受ける1つのCompletableFutureに纏め上げることも簡単になっている。

以下は、動作を確認するための最小コード。Java11で実行できる。

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

public class Main {

    public static void main(String[] args) {
        final var NUM_CONCURRENCY = 4;
        final var TASK_SIZE = 10;
        final var MAX_WAIT_SECOND = 5;
        final var MIN_WAIT_SECOND = 1;
        final var es = Executors.newFixedThreadPool(NUM_CONCURRENCY);
        final var random = new Random();

        final var tasks =
            Stream.generate(
                () -> random.nextInt(MAX_WAIT_SECOND - MIN_WAIT_SECOND + 1) + MIN_WAIT_SECOND)
                .map(waitSecond -> CompletableFuture.runAsync(() -> {
                    try {
                        System.out.println(String.format("start task: %d second.", waitSecond));
                        Thread.sleep(waitSecond * 1000L);
                        System.out.println(String.format("end task: %d second.", waitSecond));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, es))
                .limit(TASK_SIZE)
                .toArray(CompletableFuture[]::new);

        CompletableFuture.allOf(tasks).join();

        es.shutdown();
    }

}

実行結果は以下のような感じ。Thread#sleepブロッキング処理なので、スレッドを占有する。NUM_CONCURRENCY = 4で実行すると、タスクは4つしか同時に実行されない。

start task: 1 second.
start task: 5 second.
start task: 4 second.
start task: 2 second.
end task: 1 second.
start task: 2 second.
end task: 2 second.
start task: 1 second.
end task: 2 second.
start task: 3 second.
end task: 1 second.
start task: 1 second.
end task: 4 second.
start task: 3 second.
end task: 1 second.
start task: 5 second.
end task: 5 second.
end task: 3 second.
end task: 3 second.
end task: 5 second.

NUM_CONCURRENCY = 1で実行すると、タスクは逐次処理される事になる。(タスクの順番はインタリーブするかもしれない。)

2019年 今年読んだ技術書

今年読んだもの

www.oreilly.co.jp

www.oreilly.co.jp

www.oreilly.co.jp

www.oreilly.co.jp

www.oreilly.co.jp

www.maruzen-publishing.co.jp

book.impress.co.jp

www.shoeisha.co.jp

www.kinokuniya.co.jp

gihyo.jp

www.oreilly.co.jp

全部読んでないやつ

tatsu-zine.com

tatsu-zine.com

www.maruzen-publishing.co.jp

オンラインの技術書に近いもの

ボリュームがそこそこあって学びが多いものかつ、リンクを残しているもののみ

doc.akka.io

monix.io

typelevel.org

www.slideshare.net

thread-pools.md · GitHub

www.ibm.com

www.ibm.com

blog.takanabe.tokyo

speakerdeck.com

ascii.jp

データ指向アプリケーションデザインを読んでLSM-treeインデックスに基づくKVSを作る

この記事はMicroAd Advent Calendar 2019の4日目の記事です。

qiita.com

はじめに

データ指向アプリケーションデザインは今年買って読んだ技術書の中で最も読み応えがあった本でした。 www.oreilly.co.jp 単に周回で読むだけでも学びが多い本ですが、特定のツールや技術に依存しない話や抽象的な話が多く、分かったつもりになってしまいやすい本だと感じました。そこで、本で読んだことを浅い知識で終わらせず、より深い理解に至るためにシンプルなKVSを作ることにしてみました。(あと、Akka Typedが2.6.0がstableになったので、そちらの勉強も兼ねて。)また、実装言語はScalaになります。

この記事中では、現時点まで完成している機能までの解説に留めます。その為、データ指向アプリケーションデザインの 3.1データベースを駆動するデータ構造 の内容がメインになります。(たった一節読むだけで単一マシンで動作する簡単なKVSは作れるのでかなりお得な本ですね) また、本で説明されている内容をこの記事中で特に再説明はしません。行間で説明されていない実装の詳細にだけ解説を入れます。また、本の中で紹介されている用語を使用します。

指摘などありましたらお知らせいただけるとありがたいです。

どういうKVSを作るか

本当はレプリケーションやパーティショニングの機能も含む分散KVSを作りたかったのですが、今回は時間が足らず第3章のところまでです。なのでざっくりと以下のような機能を持つKVSを作りたいと思います。

  • キーは文字列のみを扱う 同様に値も文字列のみを格納できる
  • キーに紐付けた値の格納(set)ができる
  • キーに紐付いた値の取得(get)ができる
  • キーに紐付いている値の削除(delete)ができる
  • LSM-Treeインデックスに基づく
  • 再起動時に前に終了した状態から回復できる

LSM-Treeとは、という解説は本に任せますが、ざっくりとだけ復習を入れると「末尾への追記のみを行うようにすることで書き込み性能に特化するインデックス構造」のことです。

LSM-Treeを内部のインデックスとして使うことで、以下のような大きな利点があるKVSが作れます。

  • 書き込みが末尾追記ベースになるので、書き込みを高速に行える
  • インメモリデータであるMemTableにはキーと値が保持されるが、ディスクに退避しているSSTable(ソート済み文字列テーブル)の値はメモリに保持する必要がなく、またキーに付いても疎なキーインデックスを保持するだけで良い。つまり、メモリに全て載らないような大規模なデータでも効率的に取り扱うことができる。

では、具体的にやっていきましょう。

手始めにログベースのKVS

いきなりLSM-Treeをインデックスに持つKVSを作ろうとしてもしんどいので、とりあえず書き込みは末尾追記のみを行い、読み取りはキーを末尾から順に探すような超シンプルなKVSを作ってみます。

書き込み, 読み取り, 削除

キーに紐付いた値を末尾に追記していくと、新しい書き込みであればある程末尾に書き込まれていることが自明になります。つまり、あるキーの値を探そうと思った時には、現在の末尾から順にキーを探して、一番最初に見つかった値が現在のキーの値であることがわかります。最後まで見て、見つからなかった場合にはキーが存在しないことがわかります。

単に値を更新していくとなるとそれで十分ですが、ここでキーの削除を考慮に入れる必要があります。 ログベースのKVSで削除を表現するには、削除を示す特別な値(墓石と呼ばれる)を末尾に追記する必要があります。 すると、読み取り時に一番最初に見つかったものが削除を示す場合にはそのキーの値が削除されていることがわかります。

では書き込まれるログの具体的なフォーマットはどうすればいいでしょうか?

ログフォーマットの決定

ログフォーマットとして、特に何も考えないと単一の書き込みや削除を"${key},${value}\n"のようにカンマ区切りと改行で表現する方法があります。しかし、キーと値が文字列であることを先に述べました。これではキーや値にエスケープの必要がありますし、削除を表現する墓石の考慮を入れるとフォーマットがシンプルでなくなってしまいます。

そこで以下のようにバイナリフォーマットで末尾追記することにします。

f:id:dasshshsd:20191201182358p:plain
ログフォーマット

こういったバイナリフォーマットであれば、値の(バイナリ長が特定の負の整数)の場合に以下のようなフォーマットを墓石とすることで削除を表現できます。

f:id:dasshshsd:20191201182913p:plain
墓石の場合のログフォーマット

実装

ログフォーマットさえ決まれば、具体的にどのように書き込みを行い、読み込みを行うかになってきます。Scalaの場合、ファイルへのランダムアクセスの選択肢は非常に少なく、標準で使えるものとしてはJavajava.io.RandomAccessFileが候補に上がります。 書き込みの際には先ほどのフォーマットで順次バイナリを書き込んで行けばいいのですが、読み取りの際にはバイナリの読み込み毎にファイルポインタが元の位置に戻ってしまうので、RandomAccessFileの内部で管理されているポインタ位置をこちら側で別に管理してやる必要があります。

すると以下のような実装で、それっぽい感じになります。

import java.io.RandomAccessFile

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

class SimpleKvs(raf: RandomAccessFile, keyIndex: mutable.Map[String, Long]) {

  private final val KEY_HEADER_SIZE = Integer.BYTES
  private final val VALUE_HEADER_SIZE = Integer.BYTES
  private final val HEADER_SIZE = Integer.BYTES * 2
  private final val MIN_DATA_SIZE = HEADER_SIZE + 2
  private final val TOMBSTONE = Integer.MIN_VALUE

  def set(key: String, value: String): Try[Unit] = {
    val befPos = raf.length()
    try {
      raf.seek(raf.length())
      raf.writeBytes(value)
      raf.writeInt(value.length)
      raf.writeBytes(key)
      raf.writeInt(key.length)
    } catch {
      case e: java.io.IOException =>
        raf.setLength(befPos)
        return Failure(e)
    }
    keyIndex.update(key, raf.getFilePointer)
    Success(())
  }

  @scala.annotation.tailrec
  @throws[java.io.IOException]
  private def seek(key: Array[Byte], befPos: Long): Option[String] =
    if (befPos >= MIN_DATA_SIZE) {
      var pos = befPos
      raf.seek(pos)

      pos -= KEY_HEADER_SIZE
      raf.seek(pos)
      val keyLen = raf.readInt()
      pos -= keyLen
      raf.seek(pos)
      val currentKey = new Array[Byte](keyLen)
      raf.readFully(currentKey)

      pos -= VALUE_HEADER_SIZE
      raf.seek(pos)
      val valueLen = raf.readInt()
      if (valueLen == TOMBSTONE) {
        return None
      }
      pos -= valueLen

      if (currentKey.sameElements(key)) {
        val value = new Array[Byte](valueLen)
        raf.seek(pos)
        raf.readFully(value)
        Some(new String(value))
      } else {
        seek(key, pos)
      }
    } else None

  def get(key: String): Option[String] = {
    val pos = keyIndex.getOrElse(key, raf.length())
    try {
      seek(key.getBytes, pos)
    } catch {
      case _: java.io.IOException => None
    }
  }

  def del(key: String): Try[Unit] =
    get(key) match {
      case Some(_) =>
        val befPos = raf.length()
        try {
          raf.seek(raf.length())
          raf.writeInt(TOMBSTONE)
          raf.writeBytes(key)
          raf.writeInt(key.length)
        } catch {
          case e: java.io.IOException =>
            raf.setLength(befPos)
            return Failure(e)
        }
        keyIndex.update(key, raf.getFilePointer)
        Success(())

      case None =>
        Success(())
    }

}

object SimpleKvs {

  def apply(databaseFileName: String): SimpleKvs = {
    val file = new java.io.File(databaseFileName)
    if (!file.exists() && !file.createNewFile())
      throw new Throwable(
        s"can not initialize db file. ${file.getAbsolutePath}")

    val raf = new java.io.RandomAccessFile(file, "rw")
    new SimpleKvs(raf, mutable.Map.empty)
  }

}

ただ、この単純なログベースKVSでは、圧倒的に読み取り性能が悪いという問題があります。それもそのはずで、単純に値が見つかるまで書き込みを末尾からディスクシークする必要があるのでかなり遅くなってしまいます。

それでは、LSM-treeインデックスを持つKVSを作っていきましょう。

LSM-treeインデックスを持つKVSを実装する

さて、読み取り性能を高めるためには追加のデータ構造であるインデックスが必要不可欠になります。 本中ではハッシュインデックスやB-treeインデックスの説明もありますが、今回はLSM-treeインデックスを持つKVSを実装していきます。

LSM-treeインデックスの実装とはいっても色々方法はあると思います。今回はデータ指向アプリケーションデザインの内容を推測で実装していきます。実現にはインメモリデータ構造のMemTableとディスクに退避されているSSTableの二つのデータ構造が必要になります。 MemTableはキーでソートされている平衡木である必要があり、SSTableはMemTableをソート順に書き込んだものです。

書き込みや読み取りの時の流れを追っていきましょう

書き込み

書き込み時の概略図は以下のような感じです。

f:id:dasshshsd:20191201190024p:plain
書き込みの流れ

値を書き込む際にはまずインメモリデータ構造であるMemTableに書き込みを行います。この書き込みの際に、MemTableのサイズが閾値を超えている場合にはディスクに書き込みSSTableを生成します。その後、新たなMemTableを生成し今後はそこに書き込みをするようにします。

MemTableへの書き込みは競合が発生してしまうので単一のライターからしか行えません。なので、この単一のライターにはなるべく少ない仕事だけを処理させるようにする必要があり、「MemTableからSSTableを生成する処理」とライターが生成を依頼する箇所との間には非同期境界を設ける必要があります。すると、一つややこしいことが発生してしまいます。「MemTable」が「SSTable」として生成されるまでの間、その中に入っているデータに対するクエリが来た時、どうやって正しい結果を返せるでしょうか?

f:id:dasshshsd:20191201191858p:plain
SSTableを作っている間のクエリでデータが失われたように見えるケース

そのため、SSTableが生成完了するまでの間は、MemTableに対して読み取りを行う必要があります。 その結果、現在有効なログを管理するためのimmutableなデータ構造を用意しておきます。ここでログとは新規の書き込みが不可能になったMemTableとSSTableのことを指します。

f:id:dasshshsd:20191201192543p:plain
現在有効なログをimmutableなデータ構造で管理する

SSTableに対する読み込みは、ディスクアクセスが続く可能性があり、重い処理となる可能性があります。なので最新のMemTableに対象のキーが存在しなかった場合は、SSTableへの読み取りでは非同期境界を設ける必要があります。ここで有効なログがimmutableなデータ構造で管理されていることで、読み込み中に読み込み先のログが変化したりすることを防ぎます。

MemTableからSSTableの生成

では、MemTableからSSTableの生成を考えます。 既にMemTableは赤黒木などのキーでソートされる平衡木であればいいことを説明されていますが、ScalaのTreeMapは内部実装が赤黒木のソート済みツリーです。ですので自力で実装する必要はなく、MemTableはこのTreeMapのラッパーとして実装すれば十分でしょう。(キーの昇順でソートします。

/**
  * A mutable sorted map implemented using a mutable red-black tree as underlying data structure.
  *
  * @param ordering the implicit ordering used to compare objects of type `A`.
  * @tparam K the type of the keys contained in this tree map.
  * @tparam V the type of the values associated with the keys.
  *
  * @define Coll mutable.TreeMap
  * @define coll mutable tree map
  */
sealed class TreeMap[K, V] private (tree: RB.Tree[K, V])(implicit val ordering: Ordering[K])

するとTreeMapのiteratorを呼び出して、順に書き込みを行うとSSTableが生成できます。 SSTableのバイナリフォーマットは、先ほどのログベースKVSのものとほぼ同じです。

先ほどのログベースの場合と大きく異なる点は、1つのSSTableには1つのキーは1度しか登場しないということです。また、キーの昇順で書き込みが行われています。 この特性を利用すると、SSTableのキーとそのバイトオフセットをいくつかサンプリングして疎なキーインデックスを保持しておくことで、SSTableへの値の読み取りを小さな領域のスキャンに留めることができます。また、キーインデックスが疎なのでメモリ効率も高いです。

f:id:dasshshsd:20191202004040p:plain
疎なキーインデックス

上は疎なキーインデックスのイメージです。二つの配列を用意します。ある添字はサンプリングしたキーとそのバイトオフセットに対応します。キーインデックスに含まれているキーは必ずSSTableに含まれることが保証されていますが、それ以外は保証されていません。

あるキーを探したい時には、そのキー以下のキーの中で最も大きいインデックスキーと、その次のインデックスキーのバイトオフセットの間を探索します。探す対象のキー以下のキーの中で最も大きいインデックスキーは、キーインデックスに対する二分探索で求まります。これによって、キーが存在する可能性がある領域だけのディスクシークに留めることができます。キーインデックスのサンプル間隔が広ければ広いほど必要になるメモリ領域は小さくなりますが、そこは計算速度とのトレードオフとなります。

図で具体的な例をあげると、キーであるdarの値を取得したい時には、carのバイトオフセット値である134からfooの180までをディスクシークして、キーが存在すればその値を、存在しなければ存在しないことを返却すればOKです。

MemTableからの書き込みの際に、このような疎なキーインデックスを構築しながら、ディスクにバイナリフォーマットで書き込みを行なっていくことでSSTableを作成することができます。

SSTableの反映

SSTableを作成したら、読み取り時に使用できるように反映しなければなりません。 SSTableを作成している最中は、現在利用可能なログとしてMemTableを代わりに使用し、読み取りを行なっていました。なので、読み取りをMemTableからSSTableへと切り替える必要があります。これは単純に、現在有効なログを管理する不変データ構造のMemTableをSSTableに切り替えてやるだけです。

SSTableのマージとコンパクション

MemTableが一定の閾値を越えると、それがSSTableとしてディスクに書き込まれていきます。SSTableはその中で1つのキーは1度しか出現しないようになっており一定期間のコンパクションされたログだと考えることができますが、複数のSSTableをマージしてコンパクションすると、複数のSSTableから新しいSSTableを生成でき、全体としてのサイズを小さくすることができます。 各SSTableが既にキーで昇順ソートされているので、複数のSSTableをマージソートの亜種でコンパクションしながら新しい1つのSSTableにマージすることが可能です。

マージ対象のSSTableがメモリに載るサイズのものなら、古いSSTableから新しいSSTableのキーを順にソート済み平衡木に適用していけば簡単にマージすることができます。 しかし、SSTableはそのサイズが大きいことからディスクに退避しています。ディスクシークをなるべく少なく、効率的にマージとコンパクションを行いたいところです。

そこで以下のようにマージ対象のSSTableに対応するキーを1つだけ読み込んでおくバッファを用意しておきます。

f:id:dasshshsd:20191202012508p:plain

このバッファの状態として、

  • キー読み込み済み
  • EOF

の3つがあります。初期状態では空になっており、SSTableが読み込み終わっているとEOFになり、最終的には全てのSSTableに対応するバッファがEOFになるとマージが完了したと言えます。

では、このようなバッファを使ったマージ&コンパクションを説明します。

バッファを使ったマージ&コンパクション

書き込み先のSSTableを1つ用意しておきます。

ステップ1. SSTable毎にキーを読み出す

まず、空になっているバッファに対応するSSTableから、キーをバッファに読み出します。SSTableはキーの昇順ソートで書き込まれているので、キーが小さい順にバッファに読み込まれます。このステップの後、全てのバッファは読み込み済みのキーが格納されているか、EOFに到達しています。

f:id:dasshshsd:20191202013645p:plain

ステップ2. バッファ内の最小のキーの最新の値を読み出す

現在のバッファ内での最小のキーを求めます。 そして最小のキーを持つ最新のSSTableから値を読み出し、書き込み先のSSTableにバイナリフォーマットでキーと値を書き出します。 最小のキーを持つSSTableのバッファは全て空にして、読み出さなかった最小のキーを持つ各SSTableの値はスキップします。(この操作により、古い値は捨てられるのでコンパクションされている)

f:id:dasshshsd:20191202014405p:plain

そして全てのバッファがEOFに到達するまで、ステップ1とステップ2を繰り返します。

この時、バッファ分だけキーを読み込むだけなので、マージ対象のファイルがどれだけ大きくてもマージ可能です。また、ファイルポインタの明示的な移動無しで読み出し可能です。

マージが完了した際には、「SSTableの反映」と同様に、マージ対象だったSSTableを利用可能なログから削除し、マージ後のSSTableを反映させます。

このマージ自体は、各SSTableのイテレータをマージするイテレータとして実装可能で、以下のようになります。

/**
  * @param sSTables 新しい順(シーケンス番号が大きい順)に並んだSSTableのReaderのIterator
  */
class SSTableMergeIterator private (sSTables: IndexedSeq[SSTableReader])
    extends Iterator[(String, Value)] {

  private val BUFFER_IS_EMPTY = 0
  private val BUFFER_IS_FILLED = 1
  private val REACHED_EOF = 2

  private val SEGMENT_FILE_SIZE = sSTables.size
  private val emptyBuffer = Array.fill(SEGMENT_FILE_SIZE)(BUFFER_IS_EMPTY)
  private val keyBuffer = new Array[String](SEGMENT_FILE_SIZE)

  /**
    * 読み込みを行い, 次の値が存在するかどうかを確認する.
    * @return 全てのSSTableがEOFに達していれば true
    */
  override def hasNext: Boolean = {
    fulfillBuffer()
    !emptyBuffer.forall(_ == REACHED_EOF)
  }

  private def readNextOf(i: Int): Unit = {
    if (emptyBuffer(i) == BUFFER_IS_EMPTY) {
      if (sSTables(i).hasNext) {
        keyBuffer(i) = sSTables(i).readKey()
        emptyBuffer(i) = BUFFER_IS_FILLED
      } else {
        emptyBuffer(i) = REACHED_EOF
      }
    }
  }

  private def fulfillBuffer(): Unit =
    (0 until SEGMENT_FILE_SIZE) foreach readNextOf

  private def bufferFilledMinKey(): String =
    (0 until SEGMENT_FILE_SIZE)
      .filter(emptyBuffer(_) == BUFFER_IS_FILLED)
      .map(keyBuffer(_))
      .min

  /**
    * @return 現在バッファに読み込まれているキーの中で最小のキーの中で最新の値を返す.
    */
  override def next(): (String, Value) = {
    val minKey = bufferFilledMinKey()

    @tailrec
    def loop(i: Int): Value =
      if (i < SEGMENT_FILE_SIZE) {
        if (emptyBuffer(i) == BUFFER_IS_FILLED && keyBuffer(i) == minKey) {
          val latest = sSTables(i).readValue()
          emptyBuffer(i) = BUFFER_IS_EMPTY
          loopAfterLatest(i + 1, latest)
        } else {
          loop(i + 1)
        }
      } else {
        Value.Deleted // not reach this code.
      }

    @tailrec
    def loopAfterLatest(i: Int, latest: Value): Value =
      if (i < SEGMENT_FILE_SIZE) {
        if (emptyBuffer(i) == BUFFER_IS_FILLED && keyBuffer(i) == minKey) {
          sSTables(i).skipValue()
          emptyBuffer(i) = BUFFER_IS_EMPTY
        }
        loopAfterLatest(i + 1, latest)
      } else latest

    (minKey, loop(0))
  }

}

再起動時の復元

KVSが停止し、再度起動した際には、停止前の状態を復元できなくてはなりません。SSTableに関しては既にディスクに書き出しており、再起動時に読み出せば問題ありません。ただ、最後の時点でどのSSTableが有効だったのか?SSTableの新しさはどうやって判断するべきか?という問題があります。

今回はSSTableを書き出すファイル名の命名規則を決めることで、SSTableファイルの新しさを決定します。具体的にはSSTableに1つずつシーケンス番号を割り当てます。 一番最初に起動した際には、このシーケンス番号を0として割り当て、MemTableをSSTableに書き出すたびにシーケンス番号を2ずつ加算して割り当てます。 すると、SSTableには0, 2, 4, 6, ...というような飛び飛びのシーケンス番号が割り当てられていくことになります。そして現在有効なSSTableのシーケンス番号の集合を管理し、次のSSTableのシーケンス番号を統計ファイルに書き出して管理しておきます。

なぜ飛び飛びにシーケンス番号を割り当てているのか?という理由は、今回LSM-treeインデックスでのマージ戦略として、SSTableが書き出し終わったタイミングでのマージを試みるためです。 例えば現在有効なSSTableが0, 2, 4とある場合、MemTableがSSTableに書き出されて有効なSSTableとして加わると0, 2, 4, 6になります。このタイミングで一番新しいSSTableから連続するSSTableをマージして新しいSSTableを生成します。

f:id:dasshshsd:20191202133602p:plain
シーケンス番号とマージのイメージ

この時、マージする対象の最新のシーケンス番号である6に1を加えた7を新しいシーケンス番号として割り当てると、シーケンス番号が衝突せず、SSTableの新しさを管理することができます。マージ戦略が異なればまた違う管理を行う必要があるかもしれません。

という訳でSSTableは統計ファイルに書き出した情報から復元できることがわかります。 では、インメモリデータ構造であるMemTableはどうでしょうか。SSTableに書き出される前のMemTableの中身は、何もしなければ終了時には消えてしまいます。

このMemTableを復元する為の仕組みとしてWAL(Write Aheat Log)があります。これは単にバイナリフォーマットで書き込みを末尾追記しただけのコンパクションされていないログです。仕組みとしてはログベースのKVSの時とほぼ同じです。 MemTableに値を追加する時に、別にWALに書き出しておきます。すると、KVSの再起動時には、WALをリプレイしてMemTableを再構築すればいいだけなので、簡単に復元できるようになっています。WALへの書き出しは末尾に追記するだけなので高速です。

このシーケンス番号の管理と、WALによって、再起動時に前の状態を復元することができます。

(しかし、現状の戦略ではデータが損失される可能性がある箇所があります。それは、MemTableがSSTableに書き出されている最中にシステムが異常終了してしまった場合です。現状最新のMemTableに対応するWALしか用意していないので、その場合、SSTableになるはずだったMemTableのデータが消失してしまいます。これを解決するにはWALもSSTableのように複数個管理する必要がありそうです。)

アクターで組み立てる

さて、ここまでベースとなる機能の説明を行ってきました。後はこれらの機能を上手く組み立てれば完成です。しかし、SSTableの生成やマージは計算資源を分けるために非同期境界を設けたり、MemTableや現在有効なログの管理などのクリティカルセクションが存在します。これらを排他ロックを使って色々やるのはしんどいので、アクターを使ってクリティカルセクションを上手く包んでみましょう。

アクターが何か?という説明はAkka Typedのドキュメントに書いてあるので省略します。なお、今回の実装ではAkka Typed 2.6.0を使用しています。 doc.akka.io

アクターの階層は以下の図のようになります。ルートアクターがクリティカルセクションを保護しており、データの不整合状態を防ぎます。ファクトリアクターがSSTableの生成、マージを担当します。

f:id:dasshshsd:20191202134417p:plain

メッセージパッシングの概略図は以下のような感じです。

書き込みのケース

f:id:dasshshsd:20191202143257p:plain

読み取りのケース

f:id:dasshshsd:20191202144153p:plain

これでLSM-treeインデックスに基づくKVSが完成しました!

コード

コードは以下のリポジトリに全ていれてあります。 github.com

MySQLのCREATE TABLE文からサンプルデータを自動生成してみる

MySQLのCREATE TABLE文から自動でサンプルデータを生成するようなプログラムを書いてみました。

動機

実運用前のサービスでは、当然ながら使用するテーブルには実際のデータが入っていません。その為、実運用時のパフォーマンスをある程度予測したい時に、サンプルデータを入れようと思った時には自力で用意する必要があるので一苦労する可能性があります。対象となるテーブルが、外部キー制約やユニークキー制約が無かったりするシンプルなスキーマでない場合にはさらに苦労が増えるかもしれません。

そこで、依存しているテーブルのCREATE TABLE文をまとめて突っ込むことで、依存関係のあるテーブル間で制約を満たすようなデータをまとめて生成するようなツールがあったらいいなと思った次第です。あと、EXPLAINで実行計画をみる時に、実際にある程度データが入ってないと実行計画が変わったりするので、それを確認するためにサクッとデータを作れたりすると良さそうです。

ではどのようにすれば制約を考慮してデータを生成できるでしょうか。今回のプログラムにおける外部キー制約の解決についてみていきます。

外部キー制約の解決

外部キー制約の元では、参照先のテーブルのカラムで使われているデータのみをそれを参照するテーブルのカラムのデータとして使用しなければなりません。例えば、外部キー制約があるTable1のcolumn1がTable2のcolumn1に参照している場合には、Table2のcolumn1に存在しないデータを、Table1のcolumn1のデータとして使用することはできません。

となると、「参照されるテーブル」から「参照するテーブル」の順にデータを生成しなければなりません。 テーブル間には相互に参照する関係は許されていません。もし相互に参照する関係があれば、テーブルがお互い空の場合に新しくデータを追加することができなくなります。(あるテーブルにデータを追加するには、参照先のデータが必要だが、参照先のデータを用意するには、こちらのデータも必要になる。)

つまり、テーブルの参照関係は閉路が存在しない有向非巡回グラフとなります。これはトポロジカルソートが可能で、テーブルの参照関係に対してトポロジカルソートを行うと、テーブルを依存の方向順に並び替えることができます。以下の図のような感じです。

f:id:dasshshsd:20191102020710p:plain
テーブルの依存関係の例

こうして依存の方向順に並び替えることができたので、依存される方から依存する方へと、データを生成し、生成したデータを渡しながら続けていくと、外部キー制約を満たしつつサンプルデータを生成することが可能になります。

しかしこの方式にはこのままでは少し欠点があります。それは、生成したのテーブルのデータを、後続のテーブルのデータの生成ステップでも全てメモリ上に持ち続けているという点です。データ量が少ないうちであれば、これは特に問題になりませんが、大量のデータを生成するとなった際にはこれが原因でメモリが足りなくなるかもしれません。これを解決するには、閾値を超えた量のデータを生成する際に、生成したデータをバッファに溜めてディスク書き込みを行い、後続のデータ生成で必要になった際にはディスクから読み込みながら処理を行う方法も考えられます。が、今回は実装していません。

もう一種類の制約として、ユニークなカラム値の組み合わせを一意にするもの(ユニークキー、プライマリキー制約)が存在します。

ユニークキー、プライマリキー制約の解決

制約のない単なる値を生成する場合とは異なり、カラム値の組み合わせで一意にする必要がある場合には「各カラムでそのまま値を生成し、それらの値の組み合わせがユニークかチェックし、ユニークなものだけを残す」のような方法では効率が悪いと考えられます。そこで、カラム値の組み合わせを列挙することで、ユニークなカラム値の組み合わせのみを生成することにします。これは、各カラム値の組み合わせを全探索することで全列挙できます。が、生成したい行数分だけ列挙できれば十分であることに注意する必要があります。例えば、column1, column2, column3のカーディナリがそれぞれ10, 10, 10だったとします。すると、ユニークな組み合わせは10 x 10 x 10で 1000通り生成することができるのですが、実際に必要なのは10通りだとすれば、これは無駄に生成することになります。ですので、全探索を打ち切る必要が出てきます。木構造の全探索を実装しようとすると、生成するデータが非常に大きい場合にはスタックオーバーフローの恐れがあります。呼び出し状態を管理するスタックに出し入れすることで末尾再帰化しています。


といった感じで実装しています。では実際に使用例を軽く乗っけておきます。

使用例

  • 呼び出し
import csdfs.mysql.MySQLCsdfs

val csdfs = csdfs.mysql.MySQLCsdfs.instance

csdfs.generateInsertStatements(
    Seq(
      """
        |create table table1 (
        |  id int not null auto_increment,
        |  column1 mediumint not null unique,
        |  column2 enum('value1', 'value2') null,
        |  column3 varchar(64) not null,
        |
        |  foreign key (column3)
        |  references table2 (column2)
        |)
      """.stripMargin,
      """
        |create table table2 (
        |  id int not null auto_increment,
        |  column1 char not null,
        |  column2 varchar(64) not null,
        |
        |  primary key (id, column1)
        |)
      """.stripMargin
    ),
    GenConf(
      Map((Table("table1"),
        GenConf.GenTableConf(Table("table1"), 10,
          Map((Column("column2"),
            GenConf.GenColumnConf(Column("column2"), cardinality = 2))))))
    )
)
  • 返却値
Right(List(
"""INSERT INTO table1 (id, column1, column2, column3) VALUES 
(628505154, 6367780, value1, 'twpLBNpv0TYukg4vYQHZ8GTCKrZxc37ZIQvOziPNIffaJ2g4SBI8FvjCg8OYNxTQ'),
(1802938845, 16516874, value1, 'QeNg2mt0XiXUjK1US4sAaDvlO5J8ccQLe3MpqBfd19jIJRhmWCDXYVGxqJ0UJneH'),
(122688015, 7907605, value1, 'KNwEHrgqKPg9wDkY6Ix7XAgO1WVEywI2BgyrU4Kj0uEkw3hnBm2iMqlDmWCWb1MV'),
(738062263, 6200986, value1, 'tG1v1w5A03Pz4RKZUvg6lbrS29v9C66uCGqNk8iGl32YLjoRdkQwMdHPHvuDT58l'),
(887723347, 5461001, value1, 'ZcrDCsMD8aNVQi2ma35cmwOuKcDbijxCENUtZlIQvT9qPheCbnCAxZ8uL58etFzn'),
(76465175, 13984979, value1, 'VEtdhE8RlfxQ2F4tqv0oJ1eflJzfSV25hO6hU5hBjbcmOn3k6z8amS9M4IB376j6'),
(217732770, 14715916, value1, '2GdCrDXINvBD7NWvKmeXFsPqAsmGg9hd21X2nB6KTDykaA0iyFZLvOUPerJS1Hww'),
(227990809, 6879586, value1, 'hkEOdLYQ3ju0NH3lwMsYFVREtHuh84AZXzWdkOFNc3wjlTszuC19YtZ6v76APGEu'),
(1740430165, 13493854, value2, '7nLgoLxQoqu6B8B77y0ZI57XgMbXBv0uQ5nfkF0CyYhQJInvJDby0ZLocFRjQIzE'),
(296392555, 261530, value1, 'vqF3QStaVdS9xh9K7vQBgWkdLLrmvGCt1mMxqrmjrlUidm8FNY5riaLp8PskTO1e');""",
"""INSERT INTO table2 (column2, id, column1) VALUES 
('VEtdhE8RlfxQ2F4tqv0oJ1eflJzfSV25hO6hU5hBjbcmOn3k6z8amS9M4IB376j6', 1257347204, '6'),
('2GdCrDXINvBD7NWvKmeXFsPqAsmGg9hd21X2nB6KTDykaA0iyFZLvOUPerJS1Hww', 1257347204, 'l'),
('KNwEHrgqKPg9wDkY6Ix7XAgO1WVEywI2BgyrU4Kj0uEkw3hnBm2iMqlDmWCWb1MV', 1257347204, 'F'),
('tG1v1w5A03Pz4RKZUvg6lbrS29v9C66uCGqNk8iGl32YLjoRdkQwMdHPHvuDT58l', 1257347204, 'm'),
('hkEOdLYQ3ju0NH3lwMsYFVREtHuh84AZXzWdkOFNc3wjlTszuC19YtZ6v76APGEu', 1257347204, 'b'),
('vqF3QStaVdS9xh9K7vQBgWkdLLrmvGCt1mMxqrmjrlUidm8FNY5riaLp8PskTO1e', 1257347204, 'x'),
('QeNg2mt0XiXUjK1US4sAaDvlO5J8ccQLe3MpqBfd19jIJRhmWCDXYVGxqJ0UJneH', 1257347204, 'A'),
('7nLgoLxQoqu6B8B77y0ZI57XgMbXBv0uQ5nfkF0CyYhQJInvJDby0ZLocFRjQIzE', 1257347204, 'v'),
('ZcrDCsMD8aNVQi2ma35cmwOuKcDbijxCENUtZlIQvT9qPheCbnCAxZ8uL58etFzn', 1257347204, 'd'),
('twpLBNpv0TYukg4vYQHZ8GTCKrZxc37ZIQvOziPNIffaJ2g4SBI8FvjCg8OYNxTQ', 1257347204, 'U');"""))

今回作成したもののリポジトリは以下になります。

github.com

ゴルーチンによるorチャネルパターン

Go言語による並行処理4.4 orチャネル」 で、複数のチャネルをまとめて、まとめたチャネルのうちどれか1つでも閉じられた場合、まとめたチャネルも閉じるようにしたい。という場合に、orチャネルというパターンがあることが紹介されている。

「この関数はかなり簡潔になっていて」という説明があるが、関数自体を自分で再実装しようと思った時に少し理解しづらいように感じたので、別パターンの実装も用意して考えてみた。

元のor関数の実装はここには記載しないが、元の関数の場合の具体的なケースを図にしてみる。これはs1~s6のチャネルを1つのチャネルにまとめあげた場合である。

f:id:dasshshsd:20191022005323p:plain
元の関数で組み立てた時の依存グラフ

図にすると分かりやすい。ノードはチャネルを表しており、矢印の向きは監視しているチャネルを指す。あるチャネルが1つでも閉じられると、そのチャネルを監視対象とするチャネルが閉じられる。orDoneは循環しているので、どれか1つでも閉じられれば、全てのorDoneが閉じられる事になる。 つまりのところ、orDoneを循環させればいいという話である。末端のorDoneに閉じられたという情報が流れてくるので、末端のノードを全てのorDoneに繋げてしまえば全てのorDoneが閉じられることになる。

その点を踏まえて、一つ代替実装を書いてみた。末端のノードになる予定のorDoneチャネルを一つだけ先に用意しておく。再帰関数でチャネルを畳み込む時に、末端のorDoneチャネルから受け取れるように繋ぎ込んでおく。最後に、末端のorDoneチャネルだけは別途ゴルーチンで接続する。

var myOr func(channels ...<-chan interface{}) <-chan interface{}
myOr = func(channels ...<-chan interface{}) <-chan interface{} {
   switch len(channels) {
   case 0:
      return nil
   case 1:
      return channels[0]
   }

   lastOrDone := make(chan interface{})
   var joinRec func(channels ...<-chan interface{}) <-chan interface{}
   joinRec = func(channels ...<-chan interface{}) <-chan interface{} {
      if len(channels) == 1 {
         return channels[0]
      }
      orDone := make(chan interface{})
      go func() {
         defer close(orDone)
         select {
         case <-channels[0]:
         case <-lastOrDone: // 末端のノードのチャネルを繋ぎこむ
         case <-joinRec(channels[1:]...):
         }
      }()
      return orDone
   }

   joined := joinRec(channels[1:]...)
   go func() {
      defer close(lastOrDone)
      select {
      case <-channels[0]:
      case <-joined:
      }
   }()
   return lastOrDone
}

より直感的な書き方をすると、以下の図のような依存グラフが出来上がる。先ほどの元のor関数で作られた依存グラフとは少し異なる。

f:id:dasshshsd:20191022005309p:plain
myOr関数で組み立てた時の依存グラフ

元の関数が、まとめる対象のチャネルの数 x に対して必要なゴルーチンの数 x/2 だったのに対し、今回書いたものは x-1 個のゴルーチンが必要になる。あきらかに効率が悪いことがわかる。

効率をあげるためには再帰関数の1ステップでまとめあげるチャネルの数を増やせば良い。まとめあげるチャネルの数がわかっていれば、当然まとめあげるチャネル全てから受信するselect文を用意すれば、ゴルーチンの数は1つで済む。そうでないので、部分的にまとめていかざるをえない。再帰関数1ステップでまとめるチャネルの数が増えれば増えるほど、関数は冗長になるのでトレードオフの結果、本では3つというようになっているのだろう。

特に学びのない投稿になった。