Fluentdのout_http pluginでapplication/x-www-form-urlencoded形式のリクエストを送信する
概要
v1.7.0以降のFluentdでデフォルトでバンドルされているout_httpプラグインを利用すると、 HTTP/HTTPS経由でレコードを送信することができる。
out_httpを利用し、
デフォルトで利用可能なformat pluginとContent-Typeの対応
out_httpで送信可能なContent-Typeは、利用可能なformatプラグインに依存しているため、デフォルトでは以下の形式が対応している。
json
:application/json
orapplication/x-ndjson
ltsv
,tsv
:text/csv
csv
:text/csv
msgpack
:application/x-msgpack
out_file
,single_value
,hash
,stdout
:text/plain
一見、application/x-www-form-urlencoded
での送信ができないように見えるが、
ltsv
プラグインを利用することで対応が可能である。
ltsvプラグインによる application/x-www-form-urlencoded 形式でのリクエスト送信
ltsvプラグインには、delimiter
とlabel_delimiter
のパラメータを渡すことができ、それぞれフィールドのデリミタとラベルのデリミタをカスタムできる。
ltsv形式であるため、デフォルトではそれぞれ\t
、:
だが、
今回目的を達成するために以下のような設定を行う。
<match> @type http endpoint http://xxx open_timeout 5 read_timeout 5 content_type application/x-www-form-urlencoded retryable_response_codes 503 ... <format> @type ltsv delimiter "&" label_delimiter "=" add_newline false </format> </match>
delimiter
に&
、label_delimiter
に=
を設定し、content_type
に application/x-www-form-urlencoded
を設定する。
add_newline
は、レコードの最後に改行を入れるか入れないかを設定するものであり、ここではfalseとしておく。
これだけでは不十分でout_httpで送信されるリクエストは、 Fluentdのバッファで管理されているチャンク毎にまとめてリクエストすることになる。 これはout_httpがバッファリングを行うバッファOutputプラグインであるためであり、 バッファを通さずにリクエストする方法は提供されていない。
そのため、単一のレコードのみを受け付けるような既存のAPIに上手く適合させるには、 リクエストが単一のレコードのみを含むことを保証させる必要がある。
バッファプラグイン設定により単一レコードでのリクエストを保証
先ほどの設定に、新たにbufferディレクティブを追加しています。
<match> @type http endpoint http://xxx open_timeout 5 read_timeout 5 content_type application/x-www-form-urlencoded retryable_response_codes 503 <buffer uniqueId> @type memory queued_chunks_limit_size 100 flush_mode immediate flush_thread_count 8 chunk_limit_records 1 total_limit_size 512m overflow_action block </buffer> <format> @type ltsv delimiter "&" label_delimiter "=" add_newline false </format> </match>
バッファプラグインとしてmemoryを利用し、flush_mode
をimmediate、chunk_limit_records
を1、overflow_action
をblockとすることで、リクエスト単位をまとめるチャンクの中に含まれるレコードを1つに限定し、チャンクに1件追加された時点で即時フラッシュを行い、フラッシュが詰まっている場合に後続のデータが到着した場合でもブロックするようにします。
bufferディレクティブに渡しているuniqueIdは、適当にユニークなIDを指定します。
完全にユニークである必要はありません。
チャンクの中に入るレコードが1つであるため、これをユニークにすることで、レコード毎に異なるチャンクに格納されることになります。
加えて、デフォルトが1であるqueued_chunks_limit_size
/flush_thread_count
を増加させることで、リクエストの並列度を増加させます。
もしqueued_chunks_limit_size
が1の場合、1つのレコードが送信完了するまで新しいチャンクに新しいレコードを入れることができず、リクエスト送信でボトルネックになってしまいます。
ここでは仮に最大100個のチャンクを許容していますが、例えばバッファプラグインがfileだと100つのチャンクがファイルに読み書きすることになり、ディスクIOが余計に多くなってしまいます。 これを回避するためにmemoryとしていますが、リクエストの送信に失敗したり途中でFluentdがダウンすることで、チャンクに入れられたレコードがロストする可能性があることに注意してください。 ロストが許容できない場合にはこの選択はできません。
ここまでで、単一のレコード毎にHTTPでapplication/x-www-form-urlencoded形式のリクエストが可能になったように見えます。 まだこれでは不十分で、out_httpは送信するレコードのフィールドの値をURIエンコードしないという問題があります。 例えば、送信するレコードの中にスペースを含んでいたり日本語を含んでいる場合は、受信側のAPI側で正しくリクエストボディを解釈することができません。
Fluentdのrecord_transformerでフィールドをURIエンコードする
事前に送信するフィールドの値をrecord_transformer filterプラグインでURIエンコードしておきます。
<filter> @type record_transformer enable_ruby <record> ... text ${require "uri"; URI.encode_www_form_component(record["text"])} ... </record> </filter>
RubyのURIモジュールを読み込み、encode_www_form_component
を呼び出すことで個別のフィールドについてURIエンコードを適用します。
enable_rubyによる変換は比較的パフォーマンスが低いということもあり、
追加のプラグインが利用可能な場合にはそちらを優先する必要があります。
まとめ
デフォルトでバンドルされているout_httpプラグインでも、ltsvフォーマットプラグインをカスタムすれば、レコードをapplication/x-www-form-urlencoded形式にフォーマットすることが可能です。 バッファプラグインをカスタムすることで、単一のレコードにつき単一のリクエストで送信することができ、既存のAPIに対して連携を行う場合にも適用できます。 各フィールドがURIエンコードが必要な場合は、record_transformerプラグインでRubyのURIエンコードを実施することで適用できます。
いずれにせよ、このような設定で対応する場合は、レコードのロストやパフォーマンス上の懸念がない場合に適用可能です。必要に応じて追加のプラグインの利用や開発を検討する必要があります。