データ指向アプリケーションデザインを読んでLSM-treeインデックスに基づくKVSを作る

この記事はMicroAd Advent Calendar 2019の4日目の記事です。

qiita.com

はじめに

データ指向アプリケーションデザインは今年買って読んだ技術書の中で最も読み応えがあった本でした。 www.oreilly.co.jp 単に周回で読むだけでも学びが多い本ですが、特定のツールや技術に依存しない話や抽象的な話が多く、分かったつもりになってしまいやすい本だと感じました。そこで、本で読んだことを浅い知識で終わらせず、より深い理解に至るためにシンプルなKVSを作ることにしてみました。(あと、Akka Typedが2.6.0がstableになったので、そちらの勉強も兼ねて。)また、実装言語はScalaになります。

この記事中では、現時点まで完成している機能までの解説に留めます。その為、データ指向アプリケーションデザインの 3.1データベースを駆動するデータ構造 の内容がメインになります。(たった一節読むだけで単一マシンで動作する簡単なKVSは作れるのでかなりお得な本ですね) また、本で説明されている内容をこの記事中で特に再説明はしません。行間で説明されていない実装の詳細にだけ解説を入れます。また、本の中で紹介されている用語を使用します。

指摘などありましたらお知らせいただけるとありがたいです。

どういうKVSを作るか

本当はレプリケーションやパーティショニングの機能も含む分散KVSを作りたかったのですが、今回は時間が足らず第3章のところまでです。なのでざっくりと以下のような機能を持つKVSを作りたいと思います。

  • キーは文字列のみを扱う 同様に値も文字列のみを格納できる
  • キーに紐付けた値の格納(set)ができる
  • キーに紐付いた値の取得(get)ができる
  • キーに紐付いている値の削除(delete)ができる
  • LSM-Treeインデックスに基づく
  • 再起動時に前に終了した状態から回復できる

LSM-Treeとは、という解説は本に任せますが、ざっくりとだけ復習を入れると「末尾への追記のみを行うようにすることで書き込み性能に特化するインデックス構造」のことです。

LSM-Treeを内部のインデックスとして使うことで、以下のような大きな利点があるKVSが作れます。

  • 書き込みが末尾追記ベースになるので、書き込みを高速に行える
  • インメモリデータであるMemTableにはキーと値が保持されるが、ディスクに退避しているSSTable(ソート済み文字列テーブル)の値はメモリに保持する必要がなく、またキーに付いても疎なキーインデックスを保持するだけで良い。つまり、メモリに全て載らないような大規模なデータでも効率的に取り扱うことができる。

では、具体的にやっていきましょう。

手始めにログベースのKVS

いきなりLSM-Treeをインデックスに持つKVSを作ろうとしてもしんどいので、とりあえず書き込みは末尾追記のみを行い、読み取りはキーを末尾から順に探すような超シンプルなKVSを作ってみます。

書き込み, 読み取り, 削除

キーに紐付いた値を末尾に追記していくと、新しい書き込みであればある程末尾に書き込まれていることが自明になります。つまり、あるキーの値を探そうと思った時には、現在の末尾から順にキーを探して、一番最初に見つかった値が現在のキーの値であることがわかります。最後まで見て、見つからなかった場合にはキーが存在しないことがわかります。

単に値を更新していくとなるとそれで十分ですが、ここでキーの削除を考慮に入れる必要があります。 ログベースのKVSで削除を表現するには、削除を示す特別な値(墓石と呼ばれる)を末尾に追記する必要があります。 すると、読み取り時に一番最初に見つかったものが削除を示す場合にはそのキーの値が削除されていることがわかります。

では書き込まれるログの具体的なフォーマットはどうすればいいでしょうか?

ログフォーマットの決定

ログフォーマットとして、特に何も考えないと単一の書き込みや削除を"${key},${value}\n"のようにカンマ区切りと改行で表現する方法があります。しかし、キーと値が文字列であることを先に述べました。これではキーや値にエスケープの必要がありますし、削除を表現する墓石の考慮を入れるとフォーマットがシンプルでなくなってしまいます。

そこで以下のようにバイナリフォーマットで末尾追記することにします。

f:id:dasshshsd:20191201182358p:plain
ログフォーマット

こういったバイナリフォーマットであれば、値の(バイナリ長が特定の負の整数)の場合に以下のようなフォーマットを墓石とすることで削除を表現できます。

f:id:dasshshsd:20191201182913p:plain
墓石の場合のログフォーマット

実装

ログフォーマットさえ決まれば、具体的にどのように書き込みを行い、読み込みを行うかになってきます。Scalaの場合、ファイルへのランダムアクセスの選択肢は非常に少なく、標準で使えるものとしてはJavajava.io.RandomAccessFileが候補に上がります。 書き込みの際には先ほどのフォーマットで順次バイナリを書き込んで行けばいいのですが、読み取りの際にはバイナリの読み込み毎にファイルポインタが元の位置に戻ってしまうので、RandomAccessFileの内部で管理されているポインタ位置をこちら側で別に管理してやる必要があります。

すると以下のような実装で、それっぽい感じになります。

import java.io.RandomAccessFile

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

class SimpleKvs(raf: RandomAccessFile, keyIndex: mutable.Map[String, Long]) {

  private final val KEY_HEADER_SIZE = Integer.BYTES
  private final val VALUE_HEADER_SIZE = Integer.BYTES
  private final val HEADER_SIZE = Integer.BYTES * 2
  private final val MIN_DATA_SIZE = HEADER_SIZE + 2
  private final val TOMBSTONE = Integer.MIN_VALUE

  def set(key: String, value: String): Try[Unit] = {
    val befPos = raf.length()
    try {
      raf.seek(raf.length())
      raf.writeBytes(value)
      raf.writeInt(value.length)
      raf.writeBytes(key)
      raf.writeInt(key.length)
    } catch {
      case e: java.io.IOException =>
        raf.setLength(befPos)
        return Failure(e)
    }
    keyIndex.update(key, raf.getFilePointer)
    Success(())
  }

  @scala.annotation.tailrec
  @throws[java.io.IOException]
  private def seek(key: Array[Byte], befPos: Long): Option[String] =
    if (befPos >= MIN_DATA_SIZE) {
      var pos = befPos
      raf.seek(pos)

      pos -= KEY_HEADER_SIZE
      raf.seek(pos)
      val keyLen = raf.readInt()
      pos -= keyLen
      raf.seek(pos)
      val currentKey = new Array[Byte](keyLen)
      raf.readFully(currentKey)

      pos -= VALUE_HEADER_SIZE
      raf.seek(pos)
      val valueLen = raf.readInt()
      if (valueLen == TOMBSTONE) {
        return None
      }
      pos -= valueLen

      if (currentKey.sameElements(key)) {
        val value = new Array[Byte](valueLen)
        raf.seek(pos)
        raf.readFully(value)
        Some(new String(value))
      } else {
        seek(key, pos)
      }
    } else None

  def get(key: String): Option[String] = {
    val pos = keyIndex.getOrElse(key, raf.length())
    try {
      seek(key.getBytes, pos)
    } catch {
      case _: java.io.IOException => None
    }
  }

  def del(key: String): Try[Unit] =
    get(key) match {
      case Some(_) =>
        val befPos = raf.length()
        try {
          raf.seek(raf.length())
          raf.writeInt(TOMBSTONE)
          raf.writeBytes(key)
          raf.writeInt(key.length)
        } catch {
          case e: java.io.IOException =>
            raf.setLength(befPos)
            return Failure(e)
        }
        keyIndex.update(key, raf.getFilePointer)
        Success(())

      case None =>
        Success(())
    }

}

object SimpleKvs {

  def apply(databaseFileName: String): SimpleKvs = {
    val file = new java.io.File(databaseFileName)
    if (!file.exists() && !file.createNewFile())
      throw new Throwable(
        s"can not initialize db file. ${file.getAbsolutePath}")

    val raf = new java.io.RandomAccessFile(file, "rw")
    new SimpleKvs(raf, mutable.Map.empty)
  }

}

ただ、この単純なログベースKVSでは、圧倒的に読み取り性能が悪いという問題があります。それもそのはずで、単純に値が見つかるまで書き込みを末尾からディスクシークする必要があるのでかなり遅くなってしまいます。

それでは、LSM-treeインデックスを持つKVSを作っていきましょう。

LSM-treeインデックスを持つKVSを実装する

さて、読み取り性能を高めるためには追加のデータ構造であるインデックスが必要不可欠になります。 本中ではハッシュインデックスやB-treeインデックスの説明もありますが、今回はLSM-treeインデックスを持つKVSを実装していきます。

LSM-treeインデックスの実装とはいっても色々方法はあると思います。今回はデータ指向アプリケーションデザインの内容を推測で実装していきます。実現にはインメモリデータ構造のMemTableとディスクに退避されているSSTableの二つのデータ構造が必要になります。 MemTableはキーでソートされている平衡木である必要があり、SSTableはMemTableをソート順に書き込んだものです。

書き込みや読み取りの時の流れを追っていきましょう

書き込み

書き込み時の概略図は以下のような感じです。

f:id:dasshshsd:20191201190024p:plain
書き込みの流れ

値を書き込む際にはまずインメモリデータ構造であるMemTableに書き込みを行います。この書き込みの際に、MemTableのサイズが閾値を超えている場合にはディスクに書き込みSSTableを生成します。その後、新たなMemTableを生成し今後はそこに書き込みをするようにします。

MemTableへの書き込みは競合が発生してしまうので単一のライターからしか行えません。なので、この単一のライターにはなるべく少ない仕事だけを処理させるようにする必要があり、「MemTableからSSTableを生成する処理」とライターが生成を依頼する箇所との間には非同期境界を設ける必要があります。すると、一つややこしいことが発生してしまいます。「MemTable」が「SSTable」として生成されるまでの間、その中に入っているデータに対するクエリが来た時、どうやって正しい結果を返せるでしょうか?

f:id:dasshshsd:20191201191858p:plain
SSTableを作っている間のクエリでデータが失われたように見えるケース

そのため、SSTableが生成完了するまでの間は、MemTableに対して読み取りを行う必要があります。 その結果、現在有効なログを管理するためのimmutableなデータ構造を用意しておきます。ここでログとは新規の書き込みが不可能になったMemTableとSSTableのことを指します。

f:id:dasshshsd:20191201192543p:plain
現在有効なログをimmutableなデータ構造で管理する

SSTableに対する読み込みは、ディスクアクセスが続く可能性があり、重い処理となる可能性があります。なので最新のMemTableに対象のキーが存在しなかった場合は、SSTableへの読み取りでは非同期境界を設ける必要があります。ここで有効なログがimmutableなデータ構造で管理されていることで、読み込み中に読み込み先のログが変化したりすることを防ぎます。

MemTableからSSTableの生成

では、MemTableからSSTableの生成を考えます。 既にMemTableは赤黒木などのキーでソートされる平衡木であればいいことを説明されていますが、ScalaのTreeMapは内部実装が赤黒木のソート済みツリーです。ですので自力で実装する必要はなく、MemTableはこのTreeMapのラッパーとして実装すれば十分でしょう。(キーの昇順でソートします。

/**
  * A mutable sorted map implemented using a mutable red-black tree as underlying data structure.
  *
  * @param ordering the implicit ordering used to compare objects of type `A`.
  * @tparam K the type of the keys contained in this tree map.
  * @tparam V the type of the values associated with the keys.
  *
  * @define Coll mutable.TreeMap
  * @define coll mutable tree map
  */
sealed class TreeMap[K, V] private (tree: RB.Tree[K, V])(implicit val ordering: Ordering[K])

するとTreeMapのiteratorを呼び出して、順に書き込みを行うとSSTableが生成できます。 SSTableのバイナリフォーマットは、先ほどのログベースKVSのものとほぼ同じです。

先ほどのログベースの場合と大きく異なる点は、1つのSSTableには1つのキーは1度しか登場しないということです。また、キーの昇順で書き込みが行われています。 この特性を利用すると、SSTableのキーとそのバイトオフセットをいくつかサンプリングして疎なキーインデックスを保持しておくことで、SSTableへの値の読み取りを小さな領域のスキャンに留めることができます。また、キーインデックスが疎なのでメモリ効率も高いです。

f:id:dasshshsd:20191202004040p:plain
疎なキーインデックス

上は疎なキーインデックスのイメージです。二つの配列を用意します。ある添字はサンプリングしたキーとそのバイトオフセットに対応します。キーインデックスに含まれているキーは必ずSSTableに含まれることが保証されていますが、それ以外は保証されていません。

あるキーを探したい時には、そのキー以下のキーの中で最も大きいインデックスキーと、その次のインデックスキーのバイトオフセットの間を探索します。探す対象のキー以下のキーの中で最も大きいインデックスキーは、キーインデックスに対する二分探索で求まります。これによって、キーが存在する可能性がある領域だけのディスクシークに留めることができます。キーインデックスのサンプル間隔が広ければ広いほど必要になるメモリ領域は小さくなりますが、そこは計算速度とのトレードオフとなります。

図で具体的な例をあげると、キーであるdarの値を取得したい時には、carのバイトオフセット値である134からfooの180までをディスクシークして、キーが存在すればその値を、存在しなければ存在しないことを返却すればOKです。

MemTableからの書き込みの際に、このような疎なキーインデックスを構築しながら、ディスクにバイナリフォーマットで書き込みを行なっていくことでSSTableを作成することができます。

SSTableの反映

SSTableを作成したら、読み取り時に使用できるように反映しなければなりません。 SSTableを作成している最中は、現在利用可能なログとしてMemTableを代わりに使用し、読み取りを行なっていました。なので、読み取りをMemTableからSSTableへと切り替える必要があります。これは単純に、現在有効なログを管理する不変データ構造のMemTableをSSTableに切り替えてやるだけです。

SSTableのマージとコンパクション

MemTableが一定の閾値を越えると、それがSSTableとしてディスクに書き込まれていきます。SSTableはその中で1つのキーは1度しか出現しないようになっており一定期間のコンパクションされたログだと考えることができますが、複数のSSTableをマージしてコンパクションすると、複数のSSTableから新しいSSTableを生成でき、全体としてのサイズを小さくすることができます。 各SSTableが既にキーで昇順ソートされているので、複数のSSTableをマージソートの亜種でコンパクションしながら新しい1つのSSTableにマージすることが可能です。

マージ対象のSSTableがメモリに載るサイズのものなら、古いSSTableから新しいSSTableのキーを順にソート済み平衡木に適用していけば簡単にマージすることができます。 しかし、SSTableはそのサイズが大きいことからディスクに退避しています。ディスクシークをなるべく少なく、効率的にマージとコンパクションを行いたいところです。

そこで以下のようにマージ対象のSSTableに対応するキーを1つだけ読み込んでおくバッファを用意しておきます。

f:id:dasshshsd:20191202012508p:plain

このバッファの状態として、

  • キー読み込み済み
  • EOF

の3つがあります。初期状態では空になっており、SSTableが読み込み終わっているとEOFになり、最終的には全てのSSTableに対応するバッファがEOFになるとマージが完了したと言えます。

では、このようなバッファを使ったマージ&コンパクションを説明します。

バッファを使ったマージ&コンパクション

書き込み先のSSTableを1つ用意しておきます。

ステップ1. SSTable毎にキーを読み出す

まず、空になっているバッファに対応するSSTableから、キーをバッファに読み出します。SSTableはキーの昇順ソートで書き込まれているので、キーが小さい順にバッファに読み込まれます。このステップの後、全てのバッファは読み込み済みのキーが格納されているか、EOFに到達しています。

f:id:dasshshsd:20191202013645p:plain

ステップ2. バッファ内の最小のキーの最新の値を読み出す

現在のバッファ内での最小のキーを求めます。 そして最小のキーを持つ最新のSSTableから値を読み出し、書き込み先のSSTableにバイナリフォーマットでキーと値を書き出します。 最小のキーを持つSSTableのバッファは全て空にして、読み出さなかった最小のキーを持つ各SSTableの値はスキップします。(この操作により、古い値は捨てられるのでコンパクションされている)

f:id:dasshshsd:20191202014405p:plain

そして全てのバッファがEOFに到達するまで、ステップ1とステップ2を繰り返します。

この時、バッファ分だけキーを読み込むだけなので、マージ対象のファイルがどれだけ大きくてもマージ可能です。また、ファイルポインタの明示的な移動無しで読み出し可能です。

マージが完了した際には、「SSTableの反映」と同様に、マージ対象だったSSTableを利用可能なログから削除し、マージ後のSSTableを反映させます。

このマージ自体は、各SSTableのイテレータをマージするイテレータとして実装可能で、以下のようになります。

/**
  * @param sSTables 新しい順(シーケンス番号が大きい順)に並んだSSTableのReaderのIterator
  */
class SSTableMergeIterator private (sSTables: IndexedSeq[SSTableReader])
    extends Iterator[(String, Value)] {

  private val BUFFER_IS_EMPTY = 0
  private val BUFFER_IS_FILLED = 1
  private val REACHED_EOF = 2

  private val SEGMENT_FILE_SIZE = sSTables.size
  private val emptyBuffer = Array.fill(SEGMENT_FILE_SIZE)(BUFFER_IS_EMPTY)
  private val keyBuffer = new Array[String](SEGMENT_FILE_SIZE)

  /**
    * 読み込みを行い, 次の値が存在するかどうかを確認する.
    * @return 全てのSSTableがEOFに達していれば true
    */
  override def hasNext: Boolean = {
    fulfillBuffer()
    !emptyBuffer.forall(_ == REACHED_EOF)
  }

  private def readNextOf(i: Int): Unit = {
    if (emptyBuffer(i) == BUFFER_IS_EMPTY) {
      if (sSTables(i).hasNext) {
        keyBuffer(i) = sSTables(i).readKey()
        emptyBuffer(i) = BUFFER_IS_FILLED
      } else {
        emptyBuffer(i) = REACHED_EOF
      }
    }
  }

  private def fulfillBuffer(): Unit =
    (0 until SEGMENT_FILE_SIZE) foreach readNextOf

  private def bufferFilledMinKey(): String =
    (0 until SEGMENT_FILE_SIZE)
      .filter(emptyBuffer(_) == BUFFER_IS_FILLED)
      .map(keyBuffer(_))
      .min

  /**
    * @return 現在バッファに読み込まれているキーの中で最小のキーの中で最新の値を返す.
    */
  override def next(): (String, Value) = {
    val minKey = bufferFilledMinKey()

    @tailrec
    def loop(i: Int): Value =
      if (i < SEGMENT_FILE_SIZE) {
        if (emptyBuffer(i) == BUFFER_IS_FILLED && keyBuffer(i) == minKey) {
          val latest = sSTables(i).readValue()
          emptyBuffer(i) = BUFFER_IS_EMPTY
          loopAfterLatest(i + 1, latest)
        } else {
          loop(i + 1)
        }
      } else {
        Value.Deleted // not reach this code.
      }

    @tailrec
    def loopAfterLatest(i: Int, latest: Value): Value =
      if (i < SEGMENT_FILE_SIZE) {
        if (emptyBuffer(i) == BUFFER_IS_FILLED && keyBuffer(i) == minKey) {
          sSTables(i).skipValue()
          emptyBuffer(i) = BUFFER_IS_EMPTY
        }
        loopAfterLatest(i + 1, latest)
      } else latest

    (minKey, loop(0))
  }

}

再起動時の復元

KVSが停止し、再度起動した際には、停止前の状態を復元できなくてはなりません。SSTableに関しては既にディスクに書き出しており、再起動時に読み出せば問題ありません。ただ、最後の時点でどのSSTableが有効だったのか?SSTableの新しさはどうやって判断するべきか?という問題があります。

今回はSSTableを書き出すファイル名の命名規則を決めることで、SSTableファイルの新しさを決定します。具体的にはSSTableに1つずつシーケンス番号を割り当てます。 一番最初に起動した際には、このシーケンス番号を0として割り当て、MemTableをSSTableに書き出すたびにシーケンス番号を2ずつ加算して割り当てます。 すると、SSTableには0, 2, 4, 6, ...というような飛び飛びのシーケンス番号が割り当てられていくことになります。そして現在有効なSSTableのシーケンス番号の集合を管理し、次のSSTableのシーケンス番号を統計ファイルに書き出して管理しておきます。

なぜ飛び飛びにシーケンス番号を割り当てているのか?という理由は、今回LSM-treeインデックスでのマージ戦略として、SSTableが書き出し終わったタイミングでのマージを試みるためです。 例えば現在有効なSSTableが0, 2, 4とある場合、MemTableがSSTableに書き出されて有効なSSTableとして加わると0, 2, 4, 6になります。このタイミングで一番新しいSSTableから連続するSSTableをマージして新しいSSTableを生成します。

f:id:dasshshsd:20191202133602p:plain
シーケンス番号とマージのイメージ

この時、マージする対象の最新のシーケンス番号である6に1を加えた7を新しいシーケンス番号として割り当てると、シーケンス番号が衝突せず、SSTableの新しさを管理することができます。マージ戦略が異なればまた違う管理を行う必要があるかもしれません。

という訳でSSTableは統計ファイルに書き出した情報から復元できることがわかります。 では、インメモリデータ構造であるMemTableはどうでしょうか。SSTableに書き出される前のMemTableの中身は、何もしなければ終了時には消えてしまいます。

このMemTableを復元する為の仕組みとしてWAL(Write Aheat Log)があります。これは単にバイナリフォーマットで書き込みを末尾追記しただけのコンパクションされていないログです。仕組みとしてはログベースのKVSの時とほぼ同じです。 MemTableに値を追加する時に、別にWALに書き出しておきます。すると、KVSの再起動時には、WALをリプレイしてMemTableを再構築すればいいだけなので、簡単に復元できるようになっています。WALへの書き出しは末尾に追記するだけなので高速です。

このシーケンス番号の管理と、WALによって、再起動時に前の状態を復元することができます。

(しかし、現状の戦略ではデータが損失される可能性がある箇所があります。それは、MemTableがSSTableに書き出されている最中にシステムが異常終了してしまった場合です。現状最新のMemTableに対応するWALしか用意していないので、その場合、SSTableになるはずだったMemTableのデータが消失してしまいます。これを解決するにはWALもSSTableのように複数個管理する必要がありそうです。)

アクターで組み立てる

さて、ここまでベースとなる機能の説明を行ってきました。後はこれらの機能を上手く組み立てれば完成です。しかし、SSTableの生成やマージは計算資源を分けるために非同期境界を設けたり、MemTableや現在有効なログの管理などのクリティカルセクションが存在します。これらを排他ロックを使って色々やるのはしんどいので、アクターを使ってクリティカルセクションを上手く包んでみましょう。

アクターが何か?という説明はAkka Typedのドキュメントに書いてあるので省略します。なお、今回の実装ではAkka Typed 2.6.0を使用しています。 doc.akka.io

アクターの階層は以下の図のようになります。ルートアクターがクリティカルセクションを保護しており、データの不整合状態を防ぎます。ファクトリアクターがSSTableの生成、マージを担当します。

f:id:dasshshsd:20191202134417p:plain

メッセージパッシングの概略図は以下のような感じです。

書き込みのケース

f:id:dasshshsd:20191202143257p:plain

読み取りのケース

f:id:dasshshsd:20191202144153p:plain

これでLSM-treeインデックスに基づくKVSが完成しました!

コード

コードは以下のリポジトリに全ていれてあります。 github.com