Spring Boot Bean Validationでバリデーションエラーを短絡する

Spring BootにおけるBean Validationでは HibernateValidator がデフォルトで利用される。 例えば以下のように @Validated アノテーションを付与することでリクエストボディのバリデーションも行うことができ、このバリデーションにも HibernateValidator が適用されることになる。

    @PostMapping("...")
    public void post(
            ...
            @RequestBody @Validated XXX requestBody
    ) {
        ...
    }

しかし、このリクエストボディのバリデーションはデフォルトの挙動で全てのバリデーションエラーを BindingResult として報告する。 つまり、1万件のバリデーションエラーを含むリクエストボディが送信された場合には、1万件全てのバリデーションエラーを検出した上で BindExceptionConstraintViolationException を報告することになる。

これはメモリ/CPU上のパフォーマンス上の問題点を抱えている。 バリデーションエラーが高々数十件程度であれば問題ないが、上限の推測が難しい場合には全てのバリデーションエラーを報告するのではなく、最初のエラーが確認できた時点で処理を短絡させてしまいたい。

Hibernate Validatorのfail fast mode

そのようなユースケースで、Hibernate Validatorはfail fast modeを提供している。このモードを有効化している場合、最初にバリデーションエラーが報告された時点でバリデーション処理を中断させ、エラーとする。

Hibernate Validator Configurationの hibernate.validator.fail_fast フラグをtrueに設定すれば良いのだが、Spring Bootでこの設定を行う方法は以下のように LocalValidatorFactoryBean を自身で作成し、Auto Configurationで設定されるものを上書きすれば良い。

    @Bean
    public LocalValidatorFactoryBean validatorFactory() {
        LocalValidatorFactoryBean validatorFactory = new LocalValidatorFactoryBean();
        MessageInterpolatorFactory interpolatorFactory = new MessageInterpolatorFactory();
        factoryBean.setMessageInterpolator(interpolatorFactory.getObject());
        factoryBean.getValidationPropertyMap().put(HibernateValidatorConfiguration.FAIL_FAST, "true");
        return validatorFactory;
    }

LocalValidatorFactoryBeanの保持するvalidationPropertyMapにプロパティを追加すると以下の箇所で設定が読み込まれる。

hadoop fs -rmコマンド コードリーディング

hadoop fs -rm コマンドの挙動を詳しく確認する必要があり、コードリーディングを行った。

コード自体は自体は以下に存在しており、他のコマンドの実装も周辺にある。 github.com

実装上、-rm-rmdir-rmr-expungeは全て異なる実装が行われているが、 今回対象としたのは -rm だけである。

  • 引数で与えられたオプションを処理 (-r, -f, -R, -skipTrash, -safely)
  • 引数で与えられたパス文字列を内部で PathData に変換 pathが見つからない場合終了
  • 与えられたパスごとに以下の処理を実行。パスが存在しない場合終了
    • まずディレクトリかどうかを確認。 -r オプションが設定されていない場合にディレクトリを作成しようとすると失敗する
    • ゴミ箱に入れる処理を行う (hdfsでは実際にファイルを削除する前にTrashというゴミ箱に一時的に入る)
    • -skipTrash オプションが存在する場合、処理をスキップ
    • fs.trash.interval が Trashから削除される時間を制御. サーバー側で設定されている場合にはその値を優先する
    • fs.trash.interval が0の場合、Trashに入れずに直接削除する
    • Trashが有効な場合、まずhdfs上の絶対パスに直して、file statusを取得(この時点でgetfileinfoが発行される)
    • 削除対象のパスを完全修飾パスに直す
    • 現在のユーザーのTrashRootを取得する (実行ユーザーのHome Directoryに作成される xxx userで実行する場合 /user/xxx/.Trash )
    • そして現在のTrashのパスは /user/xxx/.Trash/Current である
    • もし、削除対象のパスがTrashRootに存在する場合、既にTrashに入っているので処理をスキップする
    • そして逆にTrashRootを削除しようとする場合、削除できないので例外を投げる
    • 削除対象のファイルがTrashで置かれるパスを計算
    • 以下を最大2回行う
      • fs.mkdirsでtrashRootの親ディレクトリを作成する。作成できない場合はTrash処理を終了
      • ファイルが存在していて作成に失敗した場合は、現在時間をsuffixにしたディレクトリを親に作成することにして失敗を無視してリトライ
      • 削除するファイルの名前で既にTrashに存在する場合も、現在時刻をsuffixにしたファイル名をTrashでのファイル名にする
      • fs.renameでTrashに名前を変更して移動する(これが削除処理にあたる)
      • "Moved: '" + path + "' to trash at: " + trashPath 成功するとこれがログに出る
      • 失敗した場合、コマンドが失敗する
  • Trashに入れれた場合には処理を終了する。そうでない場合続行。
  • -safely オプションが指定されている場合、巨大なディレクトリが削除されるような場合には確認を行い問題なければ削除に進む
  • fs.deleteを実行して、Trashをスキップして完全にファイルを削除する

また、コマンドの実行を細かく調査する際に、サーバー側のaudit logが設定されていればこれを確認することで実際に発行された操作の詳細を確認可能である。

例えば、クライアントで実行したコマンドが hadoop fs -rm -r xxx だった場合でも実際にサーバー側に発行されているのは以下の4つのコマンドである。

20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true   ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx       cmd=getfileinfo src=/xxx        dst=null        perm=null       proto=rpc
20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true   ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx       cmd=getfileinfo src=/xxx       dst=null        perm=null       proto=rpc
20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true   ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx       cmd=getfileinfo src=/xxx       dst=null        perm=null       proto=rpc
20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true   ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx       cmd=mkdirs      src=/user/xxx/.Trash/Current/tmp      dst=null        perm=xxx:supergroup:rwx------ proto=rpc

Spring WebFlux2.4.3 - WebSocketHandlerにPrincipalを受け渡す

要約

  • 現時点で、WebFluxでWebSocketをハンドリングするWebSocketHandlerで受け取っているWebSocketSessionHandshakeInfo保有する principleMonoは、Mono.empty()を返している。
  • つまり、そのままだとWebSocketHandlerでは、ログイン中のユーザー情報(Principal)を受け取ることができない。
  • 解決法として、デフォルトで利用されるWebSocketService実装を、ProtocolのUpgrade処理時にSessionをPrincipalに解決できるようにする実装に置き換える。

問題

そのため、以下のようにWebSocketSessionPrincipalの情報を取得しようとしても、完全に処理が停止してしまい、ユーザー情報とFluxを結合させることができない。

public class WebSocketEndpointHandler implements WebSocketHandler {

// ...

    @Override
    public @NonNull Mono<Void> handle(WebSocketSession session) {
        final var fluxWithPrincipal = session.getHandshakeInfo()
                .getPrincipal()
                .flatMap(principal -> session
                        .receive()
                        .map(webSocketMessage ->
                                // 各メッセージとPricipleの結合
                                ));
    }

// ...

}

解決策

解決策としては、ReactorNettyRequestUpgradeStrategy をベースに、オリジナルの RequestUpgradeStrategy を実装し、これを利用するようにWebFluxConfigurerを設定する。

ます、ReactorNettyRequestUpgradeStrategyをオーバーライドして、以下のようにServerSecurityContextRepositoryからPrincipalを解決するようにし、新たなHandshakeInfoを返すようにしたものを用意する。

@RequiredArgsConstructor
public class ReactorNettyRequestUpgradeStrategyWithPrincipal extends ReactorNettyRequestUpgradeStrategy {

    @NonNull
    private final ServerSecurityContextRepository serverSecurityContextRepository;

    @Override
    public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
                              @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
        return super.upgrade(exchange, handler, subProtocol, () -> {
            final var handShakeInfo = handshakeInfoFactory.get();
            final var gamePlayerPrincipal = serverSecurityContextRepository
                    .load(exchange)
                    .map(SecurityContext::getAuthentication)
                    .map(Authentication::getPrincipal)
                    .cast(GamePlayer.class)
                    .map(player -> (Principal) player::getPlayerId);

            return new HandshakeInfo(
                    handShakeInfo.getUri(),
                    handShakeInfo.getHeaders(),
                    gamePlayerPrincipal,
                    handShakeInfo.getSubProtocol(),
                    handShakeInfo.getRemoteAddress(),
                    handShakeInfo.getAttributes(),
                    handShakeInfo.getLogPrefix()
            );
        });
    }

}

そして、自作の ReactorNettyRequestUpgradeStrategy を利用するように WebFluxConfigurer を設定する。

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {

// ...

    @Autowired
    private ServerSecurityContextRepository serverSecurityContextRepository;

    @Override
    public WebSocketService getWebSocketService() {
        return new HandshakeWebSocketService(
                new ReactorNettyRequestUpgradeStrategyWithPrincipal(
                        serverSecurityContextRepository));
    }

// ...

}

これにより、WebFlux WebSocketからでもSpring Securityによって解決されたユーザー情報を利用することができるようになる。

参考文献

HBase1.2 checkAndPut & scan を用いて行の論理削除を行う

要約

  • HBaseの行を論理削除することを実現する方法として、checkAndPutとscanを用いて実装する
  • 将来的に、複数の論理削除やフラグが追加される場合でも、拡張を容易に行うことができる

やりたいこと

  • 論理削除/解除される時間はtimestampで与えられ、論理削除フラグはtimestampのlast winで最新のものが有効な値として反映される必要がある。
  • つまり、ある行rに対してt1 -> t2 -> t3の順番で発生した論理削除/解除のイベントをHBaseに適用した時に、どの順番で適用しても、最終的にはt3の状態が反映されることが期待される。
  • しかし、ある行rが存在している間にのみ論理削除/解除を適用したい
  • 論理削除されているものを取り出すか、取り出さないかは柔軟に判断して分けたい。

実装

論理削除を実行

final Table table;
final byte[] rowKeyBytes; // flagを立てる対象のrow key bytes
final byte[] familyBytes; // flagを立てる対象のfamily bytes
final byte[] flagQualifierBytes; // flag自体のqualifier bytes
final byte[] flagTimestamp; // flagのtimestamp
final byte[] mustExistFamily; // 存在している必要があるデータのfamily bytes
final byte[] mustExistQualifier; // 存在している必要があるデータのqualifier bytes

table.checkAndPut(
        rowKeyBytes,
        mustExistFamily,
        mustExistQualifier,
        CompareOp.GREATER_OR_EQUAL,
        new byte[] { 0 },
        new Put(rowKeyBytes)
                .addColumn(
                        familyBytes,
                        flagQualifierBytes,
                        flagTimestamp,
                        HConstants.EMPTY_BYTE_ARRAY
                )
);

対象のカラムに何か値が存在する場合にのみPutを実行する為に、入りうるバイト値の中で最小の 0 を設定し、それよりも大きいか等しいものが存在する場合のみPutを実行するように条件を設定する。

HBaseのcheckAndPutでは値が無いということ自体を表現する値が用意されていない為、このように対応している。 HConstants.EMPTY_BYTE_ARRAY は、「空の値が存在する」表現であり、値が存在しないことを表現しない。

HBaseのcheckAndPutは、同一のFamilyに属するColumnに対してのみ条件を設定できる。これはHBaseの内部構造に起因するもので、同じFamilyに属するColumnは同じファイルに属している為、checkAndPut操作はアトミック & 高速に実行できることが期待される。 checkAndPut操作は新しいHBaseではDeprecatedになっており、checkAndMutateが推奨されている。これは複数行に対してもアトミックに操作することが可能となっているが、パフォーマンスについて懸念があるように思える。(全然調べていない)

--- 余談

今回の条件は、以下のようなnullとNOT_EQUALの組み合わせで実装できるのではないかと考えていたが、うまく動作せずどのような場合でもPutが成功してしまった。

table.checkAndPut(
        rowKeyBytes,
        mustExistFamily,
        mustExistQualifier,
        CompareOp.NOT_EQUAL,
        null,
        new Put(rowKeyBytes)
                .addColumn(
                        familyBytes,
                        flagQualifierBytes,
                        flagTimestamp,
                        HConstants.EMPTY_BYTE_ARRAY
                )
);

論理削除の解除を実行

final Table table;
final byte[] rowKeyBytes; // flagを立てる対象のrow key bytes
final byte[] familyBytes; // flagを立てる対象のfamily bytes
final byte[] flagQualifierBytes; // flag自体のqualifier bytes
final byte[] flagTimestamp; // flag解除のtimestamp

table.delete(
        new Delete(rowKeyBytes)
                .addColumns(familyBytes, flagQualifierBytes, flagTimestamp)
);

なお、.addColumns(familyBytes, flagQualifierBytes, flagTimestamp) でtimestampを指定しているが、この場合timestamp以下のcolumnは全て削除される。

論理削除を除去したscan

final Table table;
// ...
final byte[] flagQualifierBytes; // flag自体のqualifier bytes

final Scan scan = new Scan();
// add scan configurations
// ...
final var logicallyDeleteQualifierSkipFilter =
        new SkipFilter(new QualifierFilter(
                CompareOp.NOT_EQUAL, new BinaryComparator(flagQualifierBytes)));
scan.setFilter(logicallyDeleteQualifierSkipFilter);

table.getScanner(scan);

new BinaryComparator(flagQualifierBytes) だけの場合、対象のQualifierのみがフィルタされ、行自体と他のQualifierについてはフィルタされない。 SkipFilterを組み合わせることで、条件に一つでもマッチした場合に行自体のscanをスキップする挙動になる。

別途 FilterListなどを組み合わせることで、複数のフラグを組み合わせたscan条件を構築することが可能。 フラグの追加側は、同じFamily内で複数のフラグ用Columnを用意すれば、同様に実装できる。

論理削除を含む通常のscan

フィルタを特別に設定しなければ、scan対象になる。

MySQL 5.7のsys.innodb_lock_waitsを可視化する

要約

  • MySQLには 現在トランザクションが待機しているロックの要約を見るためのsys.innodb_lock_waitsビューが定義されている。(https://dev.mysql.com/doc/refman/5.7/en/sys-innodb-lock-waits.html)
  • アプリケーションでlock timeoutの前兆を検出し、sys.innodb_lock_waitsに対するselectを発行することで、lock timeoutの原因になっているクエリを特定することができる。
  • sys.innodb_lock_waitsの結果は、人間が見た時に非常にわかりづらく、複雑な依存関係が発生している場合には解読が困難。
  • sys.innodb_lock_waitsの結果を、dot言語に変換し、graphvizレンダリングすれば分かりやすい結果を得ることができる。

コード

  • 入力をparseする箇所は適宜適切な形に読み換える必要がある。(このコードでは、{key=value, key=value, ..., key=value}, ... {key=value, key=value, ..., key=value}のような形式でsys.innodb_lock_waitsのレコードを読み取ることを想定している)

gist.github.com

結果例

  • 実際に取得できた結果の例を示す

gist.github.com

f:id:dasshshsd:20201027001340p:plain
graphvizレンダリングした結果

UUID符号空間を等分割して使う

概要

  • 16進表記のUUIDの符号空間は 00000000-0000-0000-0000-000000000000 ~ ffffffff-ffff-ffff-ffff-ffffffffffffで表せる
  • HBaseのテーブルのRow Keyが ${prefix}${区切り文字}${16進表記UUID} のような形式になっているような場合、${prefix}${区切り文字} ~ ${prefix}${${区切り文字のbyte値}+1}の間をSCAN操作することで${prefix}に属する全てのUUIDを一度にSCANすることができる。
  • しかし、一度にSCANする対象になるキーの数がはるかに大きい場合、符号空間を適切に分割した範囲で分割してSCANすることで、分割した範囲ごとに並列処理を行うことや、HBaseに対する負荷の軽減が期待できる。
  • 今回は、UUIDのケースに限って、クライアント側でSCANするキーの符号空間を適切に分割して返すコードを記述した。

実装方法

キー領域をDFSのように深掘りして分割していく。 HBaseのKeyはstopKeyがexclusiveなので、最終的に分割された領域のstopKeyのbyte値を+1をしたものをstopKeyとして使えば良い。 x個の領域に分割したい場合、n (2 ^ a and n >= x) のn個の領域に分割される事になる。

f:id:dasshshsd:20200929210704p:plain
分割イメージ

実装

gist.github.com

注意点

Javaでの処理とHBaseに格納する文字列のエンコーディングUTF-8を使用するものとする。ここで、'a'という文字が'9'という文字の値より大きいことを確認する必要がある。 (分割した領域でキーの値が単調増加する必要がある為)

assertThat('a' > '9').isTrue();

発展

  • なお、決められた符号空間によって表現されるRow Keyであれば、UUIDだけでなく他のものも同様の方法で分割できるような場合がある。
  • 実際には、負荷が高いSCANを行う場合にはMapReduceを検討すべきだが、クライアント側での分割SCANは柔軟性が高く、導入コストが楽である。
  • HBaseテーブルのRow Key Pre-Splitのように、事前に分割数が固定されていれば、最初から分割したキー領域を定義しておくことでより詳細に範囲を定めておくことが可能であるが、汎用的な分割数に対応することは難しい。

JavaのLinkedBlockingQueueを使ってみる

f:id:dasshshsd:20200508193609p:plain
やりたいこと

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Work {

    public static void main(String[] args) {
        final var queueSize = 1;
        final var queue = new LinkedBlockingQueue<String>(queueSize);
        final var isDone = new AtomicBoolean(false);
        final var es1 = Executors.newSingleThreadExecutor();
        final var es2 = Executors.newSingleThreadExecutor();

        final var w1 = CompletableFuture.runAsync(() -> {
            List.of("foo", "bar", "baz").forEach(str -> {
                try {
                    queue.put(str);
                    System.out.println("put \"" + str + '"');
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    isDone.set(true);
                    throw new RuntimeException(e);
                }
            });
            isDone.set(true);
        }, es1);

        final var w2 = CompletableFuture.runAsync(() -> {
            while (!isDone.get() || !queue.isEmpty()) {
                try {
                    final var str = queue.poll(1, TimeUnit.SECONDS);
                    System.out.println(isDone.get());
                    if (str == null) {
                        continue;
                    }
                    System.out.println("take \"" + str + '"');
                } catch (InterruptedException e) {
                    w1.cancel(true);
                    throw new RuntimeException(e);
                }
            }
        }, es2);

        CompletableFuture.allOf(w1, w2).whenComplete((v, e) -> {
            es1.shutdown();
            es2.shutdown();
        }).join();
    }

}