Spring Boot Bean Validationでバリデーションエラーを短絡する
Spring BootにおけるBean Validationでは HibernateValidator
がデフォルトで利用される。
例えば以下のように @Validated
アノテーションを付与することでリクエストボディのバリデーションも行うことができ、このバリデーションにも HibernateValidator
が適用されることになる。
@PostMapping("...") public void post( ... @RequestBody @Validated XXX requestBody ) { ... }
しかし、このリクエストボディのバリデーションはデフォルトの挙動で全てのバリデーションエラーを BindingResult
として報告する。
つまり、1万件のバリデーションエラーを含むリクエストボディが送信された場合には、1万件全てのバリデーションエラーを検出した上で BindException
や ConstraintViolationException
を報告することになる。
これはメモリ/CPU上のパフォーマンス上の問題点を抱えている。 バリデーションエラーが高々数十件程度であれば問題ないが、上限の推測が難しい場合には全てのバリデーションエラーを報告するのではなく、最初のエラーが確認できた時点で処理を短絡させてしまいたい。
Hibernate Validatorのfail fast mode
そのようなユースケースで、Hibernate Validatorはfail fast modeを提供している。このモードを有効化している場合、最初にバリデーションエラーが報告された時点でバリデーション処理を中断させ、エラーとする。
Hibernate Validator Configurationの hibernate.validator.fail_fast
フラグをtrueに設定すれば良いのだが、Spring Bootでこの設定を行う方法は以下のように LocalValidatorFactoryBean
を自身で作成し、Auto Configurationで設定されるものを上書きすれば良い。
@Bean public LocalValidatorFactoryBean validatorFactory() { LocalValidatorFactoryBean validatorFactory = new LocalValidatorFactoryBean(); MessageInterpolatorFactory interpolatorFactory = new MessageInterpolatorFactory(); factoryBean.setMessageInterpolator(interpolatorFactory.getObject()); factoryBean.getValidationPropertyMap().put(HibernateValidatorConfiguration.FAIL_FAST, "true"); return validatorFactory; }
- 補足
Spring Bootの
ValidationAutoConfiguration
でAuto ConfigurationされているBeanは、既にValidator.class
のBeanが存在する場合には初期化されないため、自前で定義したLocalValidatorFactoryBean
の設定が利用されるようになる。 - spring-boot/ValidationAutoConfiguration.java at main · spring-projects/spring-boot · GitHub
LocalValidatorFactoryBean
の保持するvalidationPropertyMapにプロパティを追加すると以下の箇所で設定が読み込まれる。
hadoop fs -rmコマンド コードリーディング
hadoop fs -rm
コマンドの挙動を詳しく確認する必要があり、コードリーディングを行った。
コード自体は自体は以下に存在しており、他のコマンドの実装も周辺にある。 github.com
実装上、-rm
、-rmdir
、-rmr
、-expunge
は全て異なる実装が行われているが、
今回対象としたのは -rm
だけである。
- 引数で与えられたオプションを処理 (
-r
,-f
,-R
,-skipTrash
,-safely
) - 引数で与えられたパス文字列を内部で PathData に変換 pathが見つからない場合終了
- 与えられたパスごとに以下の処理を実行。パスが存在しない場合終了
- まずディレクトリかどうかを確認。
-r
オプションが設定されていない場合にディレクトリを作成しようとすると失敗する - ゴミ箱に入れる処理を行う (hdfsでは実際にファイルを削除する前にTrashというゴミ箱に一時的に入る)
-skipTrash
オプションが存在する場合、処理をスキップfs.trash.interval
が Trashから削除される時間を制御. サーバー側で設定されている場合にはその値を優先するfs.trash.interval
が0の場合、Trashに入れずに直接削除する- Trashが有効な場合、まずhdfs上の絶対パスに直して、file statusを取得(この時点でgetfileinfoが発行される)
- 削除対象のパスを完全修飾パスに直す
- 現在のユーザーのTrashRootを取得する (実行ユーザーのHome Directoryに作成される xxx userで実行する場合 /user/xxx/.Trash )
- そして現在のTrashのパスは /user/xxx/.Trash/Current である
- もし、削除対象のパスがTrashRootに存在する場合、既にTrashに入っているので処理をスキップする
- そして逆にTrashRootを削除しようとする場合、削除できないので例外を投げる
- 削除対象のファイルがTrashで置かれるパスを計算
- 以下を最大2回行う
- fs.mkdirsでtrashRootの親ディレクトリを作成する。作成できない場合はTrash処理を終了
- ファイルが存在していて作成に失敗した場合は、現在時間をsuffixにしたディレクトリを親に作成することにして失敗を無視してリトライ
- 削除するファイルの名前で既にTrashに存在する場合も、現在時刻をsuffixにしたファイル名をTrashでのファイル名にする
- fs.renameでTrashに名前を変更して移動する(これが削除処理にあたる)
"Moved: '" + path + "' to trash at: " + trashPath
成功するとこれがログに出る- 失敗した場合、コマンドが失敗する
- まずディレクトリかどうかを確認。
- Trashに入れれた場合には処理を終了する。そうでない場合続行。
-safely
オプションが指定されている場合、巨大なディレクトリが削除されるような場合には確認を行い問題なければ削除に進む- fs.deleteを実行して、Trashをスキップして完全にファイルを削除する
また、コマンドの実行を細かく調査する際に、サーバー側のaudit logが設定されていればこれを確認することで実際に発行された操作の詳細を確認可能である。
例えば、クライアントで実行したコマンドが hadoop fs -rm -r xxx
だった場合でも実際にサーバー側に発行されているのは以下の4つのコマンドである。
20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx cmd=getfileinfo src=/xxx dst=null perm=null proto=rpc 20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx cmd=getfileinfo src=/xxx dst=null perm=null proto=rpc 20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx cmd=getfileinfo src=/xxx dst=null perm=null proto=rpc 20xx-xx-xx xx:xx:xx,xxx INFO FSNamesystem.audit: allowed=true ugi=xxx (auth:SIMPLE) ip=/xx.xxx.xxx.xx cmd=mkdirs src=/user/xxx/.Trash/Current/tmp dst=null perm=xxx:supergroup:rwx------ proto=rpc
Spring WebFlux2.4.3 - WebSocketHandlerにPrincipalを受け渡す
要約
- 現時点で、WebFluxでWebSocketをハンドリングする
WebSocketHandler
で受け取っているWebSocketSession
のHandshakeInfo
が保有するprincipleMono
は、Mono.empty()
を返している。- これは
HttpWebHandlerAdaper
が、DefaultServerWebExchange
を生成しており、spring-framework/HttpWebHandlerAdapter.java at a2ef6badc4c76790128910851fdde0df55ec15f9 · spring-projects/spring-framework · GitHub、これを元にHandshakeInfo
を作成しているためである。
- これは
- つまり、そのままだと
WebSocketHandler
では、ログイン中のユーザー情報(Principal
)を受け取ることができない。 - 解決法として、デフォルトで利用される
WebSocketService
実装を、ProtocolのUpgrade処理時にSessionをPrincipalに解決できるようにする実装に置き換える。
問題
- 現時点で、WebFluxでWebSocketをハンドリングする
WebSocketHandler
で受け取っているWebSocketSession
のHandshakeInfo
が保有するprincipleMono
は、Mono.empty()
を返している。公式としてWebFlux WebSocketはサポートしていないようで、記事を書いている現在もまだissueはOpenされたままとなっている。
そのため、以下のようにWebSocketSession
でPrincipal
の情報を取得しようとしても、完全に処理が停止してしまい、ユーザー情報とFluxを結合させることができない。
public class WebSocketEndpointHandler implements WebSocketHandler { // ... @Override public @NonNull Mono<Void> handle(WebSocketSession session) { final var fluxWithPrincipal = session.getHandshakeInfo() .getPrincipal() .flatMap(principal -> session .receive() .map(webSocketMessage -> // 各メッセージとPricipleの結合 )); } // ... }
解決策
解決策としては、ReactorNettyRequestUpgradeStrategy
をベースに、オリジナルの RequestUpgradeStrategy
を実装し、これを利用するようにWebFluxConfigurer
を設定する。
ます、ReactorNettyRequestUpgradeStrategy
をオーバーライドして、以下のようにServerSecurityContextRepository
からPrincipal
を解決するようにし、新たなHandshakeInfo
を返すようにしたものを用意する。
@RequiredArgsConstructor public class ReactorNettyRequestUpgradeStrategyWithPrincipal extends ReactorNettyRequestUpgradeStrategy { @NonNull private final ServerSecurityContextRepository serverSecurityContextRepository; @Override public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) { return super.upgrade(exchange, handler, subProtocol, () -> { final var handShakeInfo = handshakeInfoFactory.get(); final var gamePlayerPrincipal = serverSecurityContextRepository .load(exchange) .map(SecurityContext::getAuthentication) .map(Authentication::getPrincipal) .cast(GamePlayer.class) .map(player -> (Principal) player::getPlayerId); return new HandshakeInfo( handShakeInfo.getUri(), handShakeInfo.getHeaders(), gamePlayerPrincipal, handShakeInfo.getSubProtocol(), handShakeInfo.getRemoteAddress(), handShakeInfo.getAttributes(), handShakeInfo.getLogPrefix() ); }); } }
そして、自作の ReactorNettyRequestUpgradeStrategy
を利用するように WebFluxConfigurer
を設定する。
@Configuration @EnableWebFlux public class WebFluxConfig implements WebFluxConfigurer { // ... @Autowired private ServerSecurityContextRepository serverSecurityContextRepository; @Override public WebSocketService getWebSocketService() { return new HandshakeWebSocketService( new ReactorNettyRequestUpgradeStrategyWithPrincipal( serverSecurityContextRepository)); } // ... }
これにより、WebFlux WebSocketからでもSpring Securityによって解決されたユーザー情報を利用することができるようになる。
参考文献
HBase1.2 checkAndPut & scan を用いて行の論理削除を行う
要約
- HBaseの行を論理削除することを実現する方法として、checkAndPutとscanを用いて実装する
- 将来的に、複数の論理削除やフラグが追加される場合でも、拡張を容易に行うことができる
やりたいこと
- 論理削除/解除される時間はtimestampで与えられ、論理削除フラグはtimestampのlast winで最新のものが有効な値として反映される必要がある。
- つまり、ある行
r
に対してt1
->t2
->t3
の順番で発生した論理削除/解除のイベントをHBaseに適用した時に、どの順番で適用しても、最終的にはt3
の状態が反映されることが期待される。 - しかし、ある行
r
が存在している間にのみ論理削除/解除を適用したい - 論理削除されているものを取り出すか、取り出さないかは柔軟に判断して分けたい。
実装
論理削除を実行
final Table table; final byte[] rowKeyBytes; // flagを立てる対象のrow key bytes final byte[] familyBytes; // flagを立てる対象のfamily bytes final byte[] flagQualifierBytes; // flag自体のqualifier bytes final byte[] flagTimestamp; // flagのtimestamp final byte[] mustExistFamily; // 存在している必要があるデータのfamily bytes final byte[] mustExistQualifier; // 存在している必要があるデータのqualifier bytes table.checkAndPut( rowKeyBytes, mustExistFamily, mustExistQualifier, CompareOp.GREATER_OR_EQUAL, new byte[] { 0 }, new Put(rowKeyBytes) .addColumn( familyBytes, flagQualifierBytes, flagTimestamp, HConstants.EMPTY_BYTE_ARRAY ) );
対象のカラムに何か値が存在する場合にのみPutを実行する為に、入りうるバイト値の中で最小の 0
を設定し、それよりも大きいか等しいものが存在する場合のみPutを実行するように条件を設定する。
HBaseのcheckAndPutでは値が無いということ自体を表現する値が用意されていない為、このように対応している。 HConstants.EMPTY_BYTE_ARRAY
は、「空の値が存在する」表現であり、値が存在しないことを表現しない。
HBaseのcheckAndPutは、同一のFamilyに属するColumnに対してのみ条件を設定できる。これはHBaseの内部構造に起因するもので、同じFamilyに属するColumnは同じファイルに属している為、checkAndPut操作はアトミック & 高速に実行できることが期待される。 checkAndPut操作は新しいHBaseではDeprecatedになっており、checkAndMutateが推奨されている。これは複数行に対してもアトミックに操作することが可能となっているが、パフォーマンスについて懸念があるように思える。(全然調べていない)
--- 余談
今回の条件は、以下のようなnullとNOT_EQUALの組み合わせで実装できるのではないかと考えていたが、うまく動作せずどのような場合でもPutが成功してしまった。
table.checkAndPut( rowKeyBytes, mustExistFamily, mustExistQualifier, CompareOp.NOT_EQUAL, null, new Put(rowKeyBytes) .addColumn( familyBytes, flagQualifierBytes, flagTimestamp, HConstants.EMPTY_BYTE_ARRAY ) );
論理削除の解除を実行
final Table table; final byte[] rowKeyBytes; // flagを立てる対象のrow key bytes final byte[] familyBytes; // flagを立てる対象のfamily bytes final byte[] flagQualifierBytes; // flag自体のqualifier bytes final byte[] flagTimestamp; // flag解除のtimestamp table.delete( new Delete(rowKeyBytes) .addColumns(familyBytes, flagQualifierBytes, flagTimestamp) );
なお、.addColumns(familyBytes, flagQualifierBytes, flagTimestamp)
でtimestampを指定しているが、この場合timestamp以下のcolumnは全て削除される。
論理削除を除去したscan
final Table table; // ... final byte[] flagQualifierBytes; // flag自体のqualifier bytes final Scan scan = new Scan(); // add scan configurations // ... final var logicallyDeleteQualifierSkipFilter = new SkipFilter(new QualifierFilter( CompareOp.NOT_EQUAL, new BinaryComparator(flagQualifierBytes))); scan.setFilter(logicallyDeleteQualifierSkipFilter); table.getScanner(scan);
new BinaryComparator(flagQualifierBytes)
だけの場合、対象のQualifierのみがフィルタされ、行自体と他のQualifierについてはフィルタされない。 SkipFilter
を組み合わせることで、条件に一つでもマッチした場合に行自体のscanをスキップする挙動になる。
別途 FilterList
などを組み合わせることで、複数のフラグを組み合わせたscan条件を構築することが可能。
フラグの追加側は、同じFamily内で複数のフラグ用Columnを用意すれば、同様に実装できる。
論理削除を含む通常のscan
フィルタを特別に設定しなければ、scan対象になる。
MySQL 5.7のsys.innodb_lock_waitsを可視化する
要約
- MySQLには 現在トランザクションが待機しているロックの要約を見るためのsys.innodb_lock_waitsビューが定義されている。(https://dev.mysql.com/doc/refman/5.7/en/sys-innodb-lock-waits.html)
- アプリケーションでlock timeoutの前兆を検出し、sys.innodb_lock_waitsに対するselectを発行することで、lock timeoutの原因になっているクエリを特定することができる。
- sys.innodb_lock_waitsの結果は、人間が見た時に非常にわかりづらく、複雑な依存関係が発生している場合には解読が困難。
- sys.innodb_lock_waitsの結果を、dot言語に変換し、graphvizでレンダリングすれば分かりやすい結果を得ることができる。
コード
- 入力をparseする箇所は適宜適切な形に読み換える必要がある。(このコードでは、
{key=value, key=value, ..., key=value}, ... {key=value, key=value, ..., key=value}
のような形式でsys.innodb_lock_waitsのレコードを読み取ることを想定している)
結果例
- 実際に取得できた結果の例を示す
UUID符号空間を等分割して使う
概要
- 16進表記のUUIDの符号空間は
00000000-0000-0000-0000-000000000000
~ffffffff-ffff-ffff-ffff-ffffffffffff
で表せる - HBaseのテーブルのRow Keyが
${prefix}${区切り文字}${16進表記UUID}
のような形式になっているような場合、${prefix}${区切り文字}
~${prefix}${${区切り文字のbyte値}+1}
の間をSCAN操作することで${prefix}
に属する全てのUUIDを一度にSCANすることができる。 - しかし、一度にSCANする対象になるキーの数がはるかに大きい場合、符号空間を適切に分割した範囲で分割してSCANすることで、分割した範囲ごとに並列処理を行うことや、HBaseに対する負荷の軽減が期待できる。
- 今回は、UUIDのケースに限って、クライアント側でSCANするキーの符号空間を適切に分割して返すコードを記述した。
実装方法
キー領域をDFSのように深掘りして分割していく。
HBaseのKeyはstopKeyがexclusiveなので、最終的に分割された領域のstopKeyのbyte値を+1をしたものをstopKeyとして使えば良い。
x個の領域に分割したい場合、n (2 ^ a and n >= x)
のn個の領域に分割される事になる。
実装
注意点
Javaでの処理とHBaseに格納する文字列のエンコーディングはUTF-8を使用するものとする。ここで、'a'という文字が'9'という文字の値より大きいことを確認する必要がある。 (分割した領域でキーの値が単調増加する必要がある為)
assertThat('a' > '9').isTrue();
発展
- なお、決められた符号空間によって表現されるRow Keyであれば、UUIDだけでなく他のものも同様の方法で分割できるような場合がある。
- 実際には、負荷が高いSCANを行う場合にはMapReduceを検討すべきだが、クライアント側での分割SCANは柔軟性が高く、導入コストが楽である。
- HBaseテーブルのRow Key Pre-Splitのように、事前に分割数が固定されていれば、最初から分割したキー領域を定義しておくことでより詳細に範囲を定めておくことが可能であるが、汎用的な分割数に対応することは難しい。
JavaのLinkedBlockingQueueを使ってみる
import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class Work { public static void main(String[] args) { final var queueSize = 1; final var queue = new LinkedBlockingQueue<String>(queueSize); final var isDone = new AtomicBoolean(false); final var es1 = Executors.newSingleThreadExecutor(); final var es2 = Executors.newSingleThreadExecutor(); final var w1 = CompletableFuture.runAsync(() -> { List.of("foo", "bar", "baz").forEach(str -> { try { queue.put(str); System.out.println("put \"" + str + '"'); Thread.sleep(3000); } catch (InterruptedException e) { isDone.set(true); throw new RuntimeException(e); } }); isDone.set(true); }, es1); final var w2 = CompletableFuture.runAsync(() -> { while (!isDone.get() || !queue.isEmpty()) { try { final var str = queue.poll(1, TimeUnit.SECONDS); System.out.println(isDone.get()); if (str == null) { continue; } System.out.println("take \"" + str + '"'); } catch (InterruptedException e) { w1.cancel(true); throw new RuntimeException(e); } } }, es2); CompletableFuture.allOf(w1, w2).whenComplete((v, e) -> { es1.shutdown(); es2.shutdown(); }).join(); } }