Fluentdのout_http pluginでapplication/x-www-form-urlencoded形式のリクエストを送信する

概要

v1.7.0以降のFluentdでデフォルトでバンドルされているout_httpプラグインを利用すると、 HTTP/HTTPS経由でレコードを送信することができる。

out_httpを利用し、

  • application/x-www-form-urlencoded形式のリクエストを送信する方法について記述する。
  • 単一のレコードにつき単一のリクエストで送信する方法について記述する。

デフォルトで利用可能なformat pluginとContent-Typeの対応

out_httpで送信可能なContent-Typeは、利用可能なformatプラグインに依存しているため、デフォルトでは以下の形式が対応している。

  • json: application/json or application/x-ndjson
  • ltsv,tsv: text/csv
  • csv: text/csv
  • msgpack: application/x-msgpack
  • out_file,single_value,hash,stdout: text/plain

一見、application/x-www-form-urlencoded での送信ができないように見えるが、 ltsvプラグインを利用することで対応が可能である。

ltsvプラグインによる application/x-www-form-urlencoded 形式でのリクエスト送信

ltsvプラグインには、delimiterlabel_delimiterのパラメータを渡すことができ、それぞれフィールドのデリミタとラベルのデリミタをカスタムできる。

ltsv形式であるため、デフォルトではそれぞれ\t:だが、 今回目的を達成するために以下のような設定を行う。

<match>
  @type http
  endpoint http://xxx
  open_timeout 5
  read_timeout 5
  content_type application/x-www-form-urlencoded
  retryable_response_codes 503
  ...
  <format>
    @type ltsv
    delimiter "&"
    label_delimiter "="
    add_newline false
  </format>
</match>

delimiter&label_delimiter=を設定し、content_typeapplication/x-www-form-urlencoded を設定する。 add_newlineは、レコードの最後に改行を入れるか入れないかを設定するものであり、ここではfalseとしておく。

これだけでは不十分でout_httpで送信されるリクエストは、 Fluentdのバッファで管理されているチャンク毎にまとめてリクエストすることになる。 これはout_httpがバッファリングを行うバッファOutputプラグインであるためであり、 バッファを通さずにリクエストする方法は提供されていない。

そのため、単一のレコードのみを受け付けるような既存のAPIに上手く適合させるには、 リクエストが単一のレコードのみを含むことを保証させる必要がある。

バッファプラグイン設定により単一レコードでのリクエストを保証

先ほどの設定に、新たにbufferディレクティブを追加しています。

<match>
  @type http
  endpoint http://xxx
  open_timeout 5
  read_timeout 5
  content_type application/x-www-form-urlencoded
  retryable_response_codes 503
  <buffer uniqueId>
    @type memory
    queued_chunks_limit_size 100
    flush_mode immediate
    flush_thread_count 8
    chunk_limit_records 1
    total_limit_size 512m
    overflow_action block
  </buffer>
  <format>
    @type ltsv
    delimiter "&"
    label_delimiter "="
    add_newline false
  </format>
</match>

バッファプラグインとしてmemoryを利用し、flush_modeをimmediate、chunk_limit_recordsを1、overflow_actionをblockとすることで、リクエスト単位をまとめるチャンクの中に含まれるレコードを1つに限定し、チャンクに1件追加された時点で即時フラッシュを行い、フラッシュが詰まっている場合に後続のデータが到着した場合でもブロックするようにします。

bufferディレクティブに渡しているuniqueIdは、適当にユニークなIDを指定します。 完全にユニークである必要はありません。 チャンクの中に入るレコードが1つであるため、これをユニークにすることで、レコード毎に異なるチャンクに格納されることになります。 加えて、デフォルトが1であるqueued_chunks_limit_size/flush_thread_countを増加させることで、リクエストの並列度を増加させます。 もしqueued_chunks_limit_sizeが1の場合、1つのレコードが送信完了するまで新しいチャンクに新しいレコードを入れることができず、リクエスト送信でボトルネックになってしまいます。

ここでは仮に最大100個のチャンクを許容していますが、例えばバッファプラグインがfileだと100つのチャンクがファイルに読み書きすることになり、ディスクIOが余計に多くなってしまいます。 これを回避するためにmemoryとしていますが、リクエストの送信に失敗したり途中でFluentdがダウンすることで、チャンクに入れられたレコードがロストする可能性があることに注意してください。 ロストが許容できない場合にはこの選択はできません。

ここまでで、単一のレコード毎にHTTPでapplication/x-www-form-urlencoded形式のリクエストが可能になったように見えます。 まだこれでは不十分で、out_httpは送信するレコードのフィールドの値をURIエンコードしないという問題があります。 例えば、送信するレコードの中にスペースを含んでいたり日本語を含んでいる場合は、受信側のAPI側で正しくリクエストボディを解釈することができません。

Fluentdのrecord_transformerでフィールドをURIエンコードする

事前に送信するフィールドの値をrecord_transformer filterプラグインURIエンコードしておきます。

<filter>
  @type record_transformer
  enable_ruby
  <record>
    ...
    text ${require "uri"; URI.encode_www_form_component(record["text"])}
    ...
  </record>
</filter>

RubyURIモジュールを読み込み、encode_www_form_componentを呼び出すことで個別のフィールドについてURIエンコードを適用します。 enable_rubyによる変換は比較的パフォーマンスが低いということもあり、 追加のプラグインが利用可能な場合にはそちらを優先する必要があります。

まとめ

デフォルトでバンドルされているout_httpプラグインでも、ltsvフォーマットプラグインをカスタムすれば、レコードをapplication/x-www-form-urlencoded形式にフォーマットすることが可能です。 バッファプラグインをカスタムすることで、単一のレコードにつき単一のリクエストで送信することができ、既存のAPIに対して連携を行う場合にも適用できます。 各フィールドがURIエンコードが必要な場合は、record_transformerプラグインRubyURIエンコードを実施することで適用できます。

いずれにせよ、このような設定で対応する場合は、レコードのロストやパフォーマンス上の懸念がない場合に適用可能です。必要に応じて追加のプラグインの利用や開発を検討する必要があります。