Striim3.10.3ドキュメント
Kafka Writer
Apache Kafkaのトピックに書き込みます。KafkaWriterには、0.8.0、0.9.0、0.10.0、0.11.0、および2.1.0の5つのバージョンがあります。 ターゲットKafkaブローカーに対応するものを使用します。 たとえば、0.9.0を使用する場合の構文はCREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
です。 内部Kafkaインスタンスに書き込む場合は、0.11.0
を使用します。
既知の問題DEV-13039:Kafka brokerがオフラインになると、KafkaWriter0.9または0.10のアプリケーションがクラッシュします。th>
プロパティ
タイプ
デフォルト値
ノート
プロパティ |
タイプ |
デフォルト値 |
ノート |
|||
---|---|---|---|---|---|---|
プロパティ |
タイプ |
デフォルト値 |
||||
必要に応じて、kafkaプロデューサプロパティをセミコロンで区切って指定します。 詳細は以下の表を参照してください。文字列 |
KafkaConfigで指定されたプロデューサプロパティ値のいずれかにセミコロンが含まれている場合は、別の区切り文字を指定します。 |
Kafka設定値区切り文字 |
文字列 |
= |
kafkaconfigで指定されたプロデューサプロパティ値の1つに等号が含まれている場合は、別の区切り文字を指定します。 |
|
メッセージヘッダー |
文字列 |
必要に応じて、カフカ0を使用する場合。11以降の非同期モード、またはKafkaConfig
複数のカスタムヘッダーを指定するには、セミコ必要に応じて、Kafka0.11以降を非同期モードで使用する場合、またはkafkaconfig MessageKey : CName=”Striim”MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)MessageKey : CityName=City; Zipcode=zipMessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) 他の可能性の中で、このプロパティを使用してログ圧縮をサポートしたり、ダウンストリームアプリケーションがメッセージペイロードに基づ. |
||||
モード |
文字列 |
同期 |
KafkaWriterのモードプロパティの設定を参照してください: 同期と非同期 |
|||
並列スレッド |
整数 |
複数のライターインスタンスの作成を参照してください。 |
||||
パーティションキー |
文字列 |
値が複数のパーティション間でイベントがどのように分散されるかを決定する入力ストリーム 同じパーティションキーフィールド値を持つイベントは、同じパーティションに書き込まれます。 入力ストリームがWAEvent以外の任意のタイプの場合は、そのフィールドのいずれかの名前を指定します。 入力ストリームがWAEventタイプの場合は、構文 |
||||
トピック |
文字列 |
書き込む既存のKafkaトピック(存在しない場合は作成されません)。 複数のKafka Writerが同じトピックに書き込む場合、回復はサポートされません(アプリケーションの回復を参照してください。 (並列スレッドを使用する場合は回復がサポートされます。p> |
このアダプタにはフォーマッタの選択肢があります。 詳細については、”サポートされているwriterとformatterの組み合わせ”を参照してください。
KafkaConfigプロパティに関する注意事項
次の表に記載されている例外を除いて、KafkaConfigで任意のKafka producerプロパティを指定できます。
Kafka producer property |
notes |
---|---|
acks |
|
batch.size linger.ms 再試行 |
|
有効にします。idempotence |
バージョン2.1.0と非同期モードを使用する場合、イベントを順番に書き込むにはtrueに設定します( |
キーを参照してください。deserializer |
値は常にorgです。アパッチカフカ共通。連載。ByteArrayDeserializer、KafkaConfigによってオーバーライドできません |
内部的には、KafkaWriterはさまざまな目的のためにKafkaConsumerを呼び出し、kafkaconfigプロパティを渡すことに起因するコンシューマー APIからの警告は無視しても安全に無視できます。 Kafka producerプロパティの詳細については、Kafkaの設定を参照してください。次のサンプルコードは、PosDataPreview.csv
のデータをKafkaトピックKafkaWriterSample
に書き込みます。 このトピックは、Striimの内部Kafkaインスタンスに既に存在します。 外部Kafkaインスタンスを使用している場合は、アプリケーションを実行する前にトピックを作成する必要があります。Kafka Readerサンプルアプリケーションを実行することで、データがKafkaに書き込まれたことを確認できます。
出力の最初のフィールド(position
)は、回復後のイベントの紛失または重複を避けるために必要な情報を格納します(“アプリケーションの回復”を参照)。 回復が有効になっていない場合、その値はNULLです。
mon
KafkaWriterを使用したターゲットの出力(Monコマンドの使用を参照)には、次のものが含まれます。
-
非同期モードのみで、送信されたバイトレート:ブローカーに送信された毎秒数メガバイト
-
同期モードと非同期モードの両方で、バイトレートを書き込みます。:ブローカーによって書き込まれた秒あたりのメガバイト数とstriimによって受信された確認応答
圧縮の有効化
kafkawriterで圧縮を有効にすると、ブロー No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
lz4
snappy
. For example:
KafkaConfig:'compression.type=snappy'